You need to enable JavaScript to run this app.
导航
通过 Flink Connector驱动导入
最近更新时间:2025.01.21 10:34:38首次发布时间:2024.01.10 15:26:33

Flink Connector for ByteHouse 连接器专门用于通过 Flink 将数据加载到 ByteHouse。
本文将介绍通过 Table API&SQL 和 Flink 的 DataStreamAPI 两种方式连接ByteHouse并处理数据。

准备工作

根据您安装的 Flink 版本,下载匹配的 Flink SQL 或 Flink DataStream API 驱动。
请访问 Flink Connector 获取最新的驱动程序。

安装

本地安装

要将下载的flink-sql-connector-bytehouse-ce-${flink-sql-connector-bytehouse-ce. version}.jar文件安装到本地Maven存储库中,请运行以下命令:

mvn install:install-file \
    -Dfile=${your_path}/flink-sql-connector-bytehouse-ce-${flink-sql-connector-bytehouse-ce.version}.jar \
    -DgroupId=com.bytedance.bytehouse \
    -DartifactId=flink-sql-connector-bytehouse-ce \
    -Dversion=${flink-sql-connector-bytehouse-ce.version} \
    -Dpackaging=jar

现在您可以使用IDE(例如IntelliJ)将其作为依赖项添加到项目中。

Maven依赖

对于要使用 Flink connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的pom.xml文件中。与 Flink 发行版的关联 Scala 版本相对应。${flink-sql-connector-bytehouse-ce.version}可以设置为所需的 Flink connector 连接器版本。

<dependency>
    <groupId>com.bytedance.bytehouse</groupId>
    <artifactId>flink-sql-connector-bytehouse-ce</artifactId>
    <version>${flink-sql-connector-bytehouse-ce.version}</version>
</dependency>

使用示例

下面是通过 FlinkSQL 将数据表单加载到 ByteHouse 企业版数据表中的示例。

说明

  • 您可参见获取集群连接信息页面来获取需要连接的集群连接信息,并替换下面对应的占位符。
  • 详细 参数说明 附在文末,可供查询。
CREATE TEMPORARY TABLE `bh_ce_source` (  -- source table
  `id` BIGINT NOT NULL,
  `time` TIMESTAMP(0),
  `content` ARRAY<DECIMAL(20, 0)>
) WITH (
  'connector' = 'kinesis',
  'stream' = 'demo_stream',
  'format' = 'json',
  'aws.region' = 'cn-north-1',
  'aws.credentials.provider' = 'BASIC',
  'aws.credentials.basic.accesskeyid' = '???',
  'aws.credentials.basic.secretkey' = '???',
  'scan.shard.getrecords.maxretries' = '7'
);

CREATE TEMPORARY TABLE `bh_ce_sink` (  -- sink table
  `id` STRING NOT NULL,
  `event_time` STRING,
  `content` ARRAY<DECIMAL(20, 0)>
) WITH (
  'connector' = 'bytehouse-ce',
  'clickhouse.shard-discovery.kind' = 'CE_API_CLUSTERS',
  'clickhouse.shard-discovery.service.host' = 'XXXXXXXXXXXXXXXX.bytehouse-ce.ivolces.com',
  'clickhouse.shard-discovery.service.port' = '80',
  'bytehouse-ce.api.get-shard-info' = '/openapi/v1/clusters/{cluster_name}/nodes',
  'bytehouse-ce.api.account-id' = '<< your Volcano account ID >>',
  'username' = '<< your ByteHouse CE username >>',
  'password' = '<< your ByteHouse CE password >>',
  'sink.overload.log-level' = 'DEBUG',
  'sink.buffer-flush.interval' = '1 second',
  'sink.buffer-flush.max-rows' = '5000',
  'sink.parallelism' = '10',  -- Recommended to be no more than the number of shards
  'clickhouse.cluster' = '<< your cluster name of ByteHouse CE >>',
  'database' = 'my_db',
  'table-name' = 'my_table_local',  -- MUST BE LOCAL TABLE
  'sink.group-by.key' = 'event_time',
  'sink.group-by.expression' = 'cityHash64(event_time)',
  'sink.group-by.number-of-groups' = '16',  -- Need to be n (n > 0) times of the number of shards
  'sharding-strategy' = 'HASH',
  'sharding-key' = 'event_time',
  'sharding-expression' = 'cityHash64(event_time)'
);

INSERT INTO `bh_ce_sink` (  -- data loading
  `id`,
  `event_time`,
  `content`
)
SELECT
  IFNULL(CAST(`id` AS STRING), '43'),  -- type casting with default value
  CAST(`time` AS STRING),  -- type casting
  IFNULL(`content`, ARRAY[123,12345])  -- default value
FROM `bh_ce_source`

下面是通过 Flink DataStream API 将数据表单加载到 ByteHouse 企业版数据表中的示例。

说明

  • 您可参见获取集群连接信息页面来获取需要连接的集群连接信息,并替换下面对应的占位符。
  • 详细 参数说明 附在文末,可供查询。
package bytehouse.flink.connectoer.demo;
import com.bytedance.bytehouse.flink.connector.clickhouse.ClickHouseSinkFunction;
import com.bytedance.bytehouse.flink.connector.clickhouse.api.java.ClickHouseSinkFunctionBuilder;
import com.bytedance.bytehouse.flink.table.api.RowDataConstructor;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
public class AcceptLogSinkDataStreamExample {
    public static void main(String[] args) throws Exception {
        final String gatewayHost = args[0];
        final String username = args[1];
        final String password = args[2];
        final String clusterName = args[3];
        final String dbName = args[4];
        final String tableName = args[5];
        final long rowsPerSecond = Long.parseLong(args[6]);
        final long numberOfRows = Long.parseLong(args[7]);
        // Creating the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Adding a source to the data stream
        DataStream<RowData> dataStream =
                env.addSource(
                                new DataGeneratorSource<>(new RowDataGenerator(), rowsPerSecond, numberOfRows))
                        .returns(TypeInformation.of(RowData.class));
        // List of columns representing the table schema
        List<Column> columns =
                Arrays.asList(
                        Column.physical("create_time", DataTypes.TIMESTAMP(0)),
                        Column.physical("completion_id", DataTypes.STRING()),
                        Column.physical("device_mid", DataTypes.STRING()));
        try (@SuppressWarnings("unchecked")
             ClickHouseSinkFunction<RowData> sink =
                     new ClickHouseSinkFunctionBuilder.Upsert(clusterName, dbName, tableName)
                             .withSchema(columns)
                             .withShardDiscoveryKind("CE_GATEWAY")
                             .withGateway(gatewayHost)
                             .withAccount(username, password)
                             .withPrimaryKey(Arrays.asList("completion_id"))
                             .withShardingKey("completion_id")
                             .withFlushInterval(Duration.ofSeconds(1))
                             .build()) {
            // Add the sink to the data stream
            dataStream.addSink(sink);
            // Trigger the execution
            env.execute();
        }
    }
    static class RowDataGenerator implements DataGenerator<RowData> {
        private final Random random = new Random();
        private final RowDataConstructor rowDataConstructor =
                RowDataConstructor.apply(
                        new DataType[] {DataTypes.TIMESTAMP(0), DataTypes.STRING(), DataTypes.STRING()});
        @Override
        public RowData next() {
            final Object[] rowDataFields = {
                    LocalDateTime.now(), "CID-" + random.nextInt(1000), "DMID-" + random.nextInt(1000)
            };
            return rowDataConstructor.constructInsert(rowDataFields);
        }
        @Override
        public void open(
                String name, FunctionInitializationContext context, RuntimeContext runtimeContext)
                throws Exception {
            // Initialization code here (if needed)
        }
        @Override
        public boolean hasNext() {
            return true; // Continuous generation
        }
    }
}

参数说明

参数

必选

默认值

数据类型

描述

connector

(none)

String

指定要使用的驱动,这里应该是 bytehouse-ce(ByteHouse企业版) 或者 clickhouse

database

(none)

String

需要连接的 ByteHouse 企业版数据库的名称。

table-name

(none)

String

需要连接的 ByteHouse 企业版表的名称。 注意,这里的“表”是指每个分片的本地表。

clickhouse.cluster

(none)

String

ByteHouse 集群的名称。

clickhouse.shard-discovery.kind

SYSTEM_CLUSTERS

String

ClickHouse 分片发现的类型。 支持的值为:

  • SYSTEM_CLUSTERS: 通过system.clusters 表检索分片IP。
  • CONSUL: 通过 Consul 服务检索分片IP。
  • CE_API_CLUSTERS: 通过 ByteHouse 企业版特定的 OpenAPI 检索分片 IP。
  • CE_GATEWAY: 通过 ByteHouse 企业版网关检索分片 IP。

clickhouse.shard-discovery.service.host

127.0.0.1

String

ByteHouse 分片发现服务的主机名。

clickhouse.shard-discovery.service.port

8123

Integer

ByteHouse分片发现服务的端口号。

clickhouse.shard-discovery.address-mapping

(none)

Map

已发现分片的地址映射。

bytehouse-ce.api.get-consul-info

(built-in)

String

用于检索 Consul 服务信息的 ByteHouse API。

bytehouse-ce.api.get-shard-info

(built-in)

String

用于检索分片信息的 ByteHouse API。

bytehouse-ce.auth.api

(built-in)

String

使用 ByteHouse API 的身份验证密钥。

bytehouse.ce.gateway.host

(none)

String

ByteHouse 企业版 网关的主机。

bytehouse.ce.gateway.port

8123

Integer

ByteHouse 企业版 网关的端口。

username

(none)

String

JDBC 连接用户名。 一旦指定,环境变量 CLICKHOUSE_USERNAME 将被忽略。
如果使用usernamepassword,则必须同时指定两者的值。

password

(none)

String

JDBC 连接密码。 一旦指定,环境变量 CLICKHOUSE_USERNAME 将被忽略。

sharding-strategy

NONE

String

ByteHouse分布式分区策略。 支持的值为:

  • NONE: 不分区。
  • RANDOM: 随机分区。
  • ROUND_ROBIN: 根据数据到达的顺序进行分区。
  • HASH: 根据数据内容进行分区。

sharding-key

(none)

String

哈希分区的键。 键可以包含多个字段,以逗号分隔。

sharding-expression

(none)

String

哈希分区的表达式。 如果设置此项,则所有涉及的字段名称也必须在分区键partition-key中列出。

sink.buffer-flush.interval

1 second

Duration

两次批量刷新之间的最大间隔。 该时间最少为 200 毫秒。

sink.buffer-flush.max-rows

50,000

Integer

刷新前缓冲记录的最大值。 该值最少为 100

sink.buffer-flush.max-batches

6

Integer

通过异步刷新触发过载预防的待处理批次数量的阈值。 该值最小为 1

sink.max-retries

-1

Integer

刷新数据失败时的最大重试次数。 设置为-1意味着无限次重试。

sink.parallelism

(none)

Integer

刷新数据的最大并行度。 默认情况下,并行度由框架使用上游链式运算符的相同并行度来确定。

sink.proactive-validate

false

Boolean

指定是否启用主动数据验证(即在添加到批次之前要验证的每条记录)。 默认情况下,使用被动数据验证(即仅在数据刷新尝试失败时触发数据验证)来减少运行时开销。

metrics.update-interval

5 seconds

Duration

刷新指标的固定间隔。 该时间最少为 5 秒。

metrics.log-level

INFO

String

记录指标的日志级别。 这对应于 Log4J 内置的标准日志级别

timestamp-offset

(none)

Duration

时间戳数据的附加时间偏移量。

lookup.async.scale-factor

1

Integer

异步查找的比例因子。如果设置为小于 2,查找将以同步 (SYNC) 模式运行。如果设置为 n(n >= 2),查找将以异步 (ASYNC) 模式运行,每个查找实例的最大并发数等于 n。

lookup.cache.max-rows

0

Long

数据表的查找缓存的最大行数。如果设置为 0,则禁用缓存。

lookup.cache.ttl

(none)

Duration

ByteHouse 表的查找缓存的 TTL。如果未指定,则不为缓存记录设置 TTL。

lookup.max-retries

3

Integer

查找操作失败时允许的最大重试次数。