Flink Connector for ByteHouse 连接器专门用于通过 Flink 将数据加载到 ByteHouse。
本文将介绍通过 Table API&SQL 和 Flink 的 DataStreamAPI 两种方式连接ByteHouse并处理数据。
根据您安装的 Flink 版本,下载匹配的 Flink SQL 或 Flink DataStream API 驱动。
请访问 Flink Connector 获取最新的驱动程序。
要将下载的flink-sql-connector-bytehouse-ce-${flink-sql-connector-bytehouse-ce. version}.jar
文件安装到本地Maven存储库中,请运行以下命令:
mvn install:install-file \ -Dfile=${your_path}/flink-sql-connector-bytehouse-ce-${flink-sql-connector-bytehouse-ce.version}.jar \ -DgroupId=com.bytedance.bytehouse \ -DartifactId=flink-sql-connector-bytehouse-ce \ -Dversion=${flink-sql-connector-bytehouse-ce.version} \ -Dpackaging=jar
现在您可以使用IDE(例如IntelliJ)将其作为依赖项添加到项目中。
对于要使用 Flink connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的pom.xml
文件中。与 Flink 发行版的关联 Scala 版本相对应。${flink-sql-connector-bytehouse-ce.version}
可以设置为所需的 Flink connector 连接器版本。
<dependency> <groupId>com.bytedance.bytehouse</groupId> <artifactId>flink-sql-connector-bytehouse-ce</artifactId> <version>${flink-sql-connector-bytehouse-ce.version}</version> </dependency>
下面是通过 FlinkSQL 将数据表单加载到 ByteHouse 企业版数据表中的示例。
说明
CREATE TEMPORARY TABLE `bh_ce_source` ( -- source table `id` BIGINT NOT NULL, `time` TIMESTAMP(0), `content` ARRAY<DECIMAL(20, 0)> ) WITH ( 'connector' = 'kinesis', 'stream' = 'demo_stream', 'format' = 'json', 'aws.region' = 'cn-north-1', 'aws.credentials.provider' = 'BASIC', 'aws.credentials.basic.accesskeyid' = '???', 'aws.credentials.basic.secretkey' = '???', 'scan.shard.getrecords.maxretries' = '7' ); CREATE TEMPORARY TABLE `bh_ce_sink` ( -- sink table `id` STRING NOT NULL, `event_time` STRING, `content` ARRAY<DECIMAL(20, 0)> ) WITH ( 'connector' = 'bytehouse-ce', 'clickhouse.shard-discovery.kind' = 'CE_API_CLUSTERS', 'clickhouse.shard-discovery.service.host' = 'XXXXXXXXXXXXXXXX.bytehouse-ce.ivolces.com', 'clickhouse.shard-discovery.service.port' = '80', 'bytehouse-ce.api.get-shard-info' = '/openapi/v1/clusters/{cluster_name}/nodes', 'bytehouse-ce.api.account-id' = '<< your Volcano account ID >>', 'username' = '<< your ByteHouse CE username >>', 'password' = '<< your ByteHouse CE password >>', 'sink.overload.log-level' = 'DEBUG', 'sink.buffer-flush.interval' = '1 second', 'sink.buffer-flush.max-rows' = '5000', 'sink.parallelism' = '10', -- Recommended to be no more than the number of shards 'clickhouse.cluster' = '<< your cluster name of ByteHouse CE >>', 'database' = 'my_db', 'table-name' = 'my_table_local', -- MUST BE LOCAL TABLE 'sink.group-by.key' = 'event_time', 'sink.group-by.expression' = 'cityHash64(event_time)', 'sink.group-by.number-of-groups' = '16', -- Need to be n (n > 0) times of the number of shards 'sharding-strategy' = 'HASH', 'sharding-key' = 'event_time', 'sharding-expression' = 'cityHash64(event_time)' ); INSERT INTO `bh_ce_sink` ( -- data loading `id`, `event_time`, `content` ) SELECT IFNULL(CAST(`id` AS STRING), '43'), -- type casting with default value CAST(`time` AS STRING), -- type casting IFNULL(`content`, ARRAY[123,12345]) -- default value FROM `bh_ce_source`
下面是通过 Flink DataStream API 将数据表单加载到 ByteHouse 企业版数据表中的示例。
说明
package bytehouse.flink.connectoer.demo; import com.bytedance.bytehouse.flink.connector.clickhouse.ClickHouseSinkFunction; import com.bytedance.bytehouse.flink.connector.clickhouse.api.java.ClickHouseSinkFunctionBuilder; import com.bytedance.bytehouse.flink.table.api.RowDataConstructor; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator; import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import java.time.Duration; import java.time.LocalDateTime; import java.util.Arrays; import java.util.List; import java.util.Random; public class AcceptLogSinkDataStreamExample { public static void main(String[] args) throws Exception { final String gatewayHost = args[0]; final String username = args[1]; final String password = args[2]; final String clusterName = args[3]; final String dbName = args[4]; final String tableName = args[5]; final long rowsPerSecond = Long.parseLong(args[6]); final long numberOfRows = Long.parseLong(args[7]); // Creating the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Adding a source to the data stream DataStream<RowData> dataStream = env.addSource( new DataGeneratorSource<>(new RowDataGenerator(), rowsPerSecond, numberOfRows)) .returns(TypeInformation.of(RowData.class)); // List of columns representing the table schema List<Column> columns = Arrays.asList( Column.physical("create_time", DataTypes.TIMESTAMP(0)), Column.physical("completion_id", DataTypes.STRING()), Column.physical("device_mid", DataTypes.STRING())); try (@SuppressWarnings("unchecked") ClickHouseSinkFunction<RowData> sink = new ClickHouseSinkFunctionBuilder.Upsert(clusterName, dbName, tableName) .withSchema(columns) .withShardDiscoveryKind("CE_GATEWAY") .withGateway(gatewayHost) .withAccount(username, password) .withPrimaryKey(Arrays.asList("completion_id")) .withShardingKey("completion_id") .withFlushInterval(Duration.ofSeconds(1)) .build()) { // Add the sink to the data stream dataStream.addSink(sink); // Trigger the execution env.execute(); } } static class RowDataGenerator implements DataGenerator<RowData> { private final Random random = new Random(); private final RowDataConstructor rowDataConstructor = RowDataConstructor.apply( new DataType[] {DataTypes.TIMESTAMP(0), DataTypes.STRING(), DataTypes.STRING()}); @Override public RowData next() { final Object[] rowDataFields = { LocalDateTime.now(), "CID-" + random.nextInt(1000), "DMID-" + random.nextInt(1000) }; return rowDataConstructor.constructInsert(rowDataFields); } @Override public void open( String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception { // Initialization code here (if needed) } @Override public boolean hasNext() { return true; // Continuous generation } } }
参数 | 必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
| 是 | (none) | String | 指定要使用的驱动,这里应该是 |
| 是 | (none) | String | 需要连接的 ByteHouse 企业版数据库的名称。 |
| 是 | (none) | String | 需要连接的 ByteHouse 企业版表的名称。 注意,这里的“表”是指每个分片的本地表。 |
| 是 | (none) | String | ByteHouse 集群的名称。 |
| 否 |
| String | ClickHouse 分片发现的类型。 支持的值为:
|
| 否 |
| String | ByteHouse 分片发现服务的主机名。 |
| 否 |
| Integer | ByteHouse分片发现服务的端口号。 |
| 否 | (none) | Map | 已发现分片的地址映射。 |
| 否 | (built-in) | String | 用于检索 Consul 服务信息的 ByteHouse API。 |
| 否 | (built-in) | String | 用于检索分片信息的 ByteHouse API。 |
| 否 | (built-in) | String | 使用 ByteHouse API 的身份验证密钥。 |
| 否 | (none) | String | ByteHouse 企业版 网关的主机。 |
| 否 |
| Integer | ByteHouse 企业版 网关的端口。 |
| 否 | (none) | String | JDBC 连接用户名。 一旦指定,环境变量 |
| 否 | (none) | String | JDBC 连接密码。 一旦指定,环境变量 |
| 否 |
| String | ByteHouse分布式分区策略。 支持的值为:
|
| 否 | (none) | String | 哈希分区的键。 键可以包含多个字段,以逗号分隔。 |
| 否 | (none) | String | 哈希分区的表达式。 如果设置此项,则所有涉及的字段名称也必须在分区键 |
| 否 |
| Duration | 两次批量刷新之间的最大间隔。 该时间最少为 200 毫秒。 |
| 否 |
| Integer | 刷新前缓冲记录的最大值。 该值最少为 |
| 否 |
| Integer | 通过异步刷新触发过载预防的待处理批次数量的阈值。 该值最小为 |
| 否 |
| Integer | 刷新数据失败时的最大重试次数。 设置为 |
| 否 | (none) | Integer | 刷新数据的最大并行度。 默认情况下,并行度由框架使用上游链式运算符的相同并行度来确定。 |
| 否 |
| Boolean | 指定是否启用主动数据验证(即在添加到批次之前要验证的每条记录)。 默认情况下,使用被动数据验证(即仅在数据刷新尝试失败时触发数据验证)来减少运行时开销。 |
| 否 |
| Duration | 刷新指标的固定间隔。 该时间最少为 |
| 否 |
| String | 记录指标的日志级别。 这对应于 Log4J 内置的标准日志级别。 |
| 否 | (none) | Duration | 时间戳数据的附加时间偏移量。 |
| 否 |
| Integer | 异步查找的比例因子。如果设置为小于 2,查找将以同步 (SYNC) 模式运行。如果设置为 n(n >= 2),查找将以异步 (ASYNC) 模式运行,每个查找实例的最大并发数等于 n。 |
| 否 |
| Long | 数据表的查找缓存的最大行数。如果设置为 0,则禁用缓存。 |
| 否 | (none) | Duration | ByteHouse 表的查找缓存的 TTL。如果未指定,则不为缓存记录设置 TTL。 |
| 否 |
| Integer | 查找操作失败时允许的最大重试次数。 |