HaKafka 是一种特殊的表引擎,修改自社区 Kafka 引擎。使用 Kafka / HaKafka 引擎可以订阅 Kafka 上的 topic,拉取并解析 topic 中的消息,然后通过 MaterializedView 将 Kafka/HaKafka 解析到的数据写入到目标表(一般为HaMergeTree)。
在 ByteHouse GUI 中,创建 Kafka 导入任务,底层即为创建了 HaKafka 和 MaterializedView 两张表。
在 ByteHouse 中,社区的 Kafka 引擎目前基本上未做改动,不具备高可用的功能,不推荐使用,以下仅介绍 HaKafka。
建一张 HaKafka 的语法如下:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = HaKafka('{shard}', '{replica}') SETTINGS (.....)
参数名 | 类型 | 必填/默认值 | 说明 |
---|---|---|---|
kafka_broker_list | String | 必填 | ip:port。可以多个,逗号分隔。 |
kafka_topic_list | String | 必填 | 可以多个,逗号分隔。 |
kafka_group_name | String | 必填 | 消费组名称。 |
kafka_format | String | 必填 | 消息格式;目前最常用 JSONEachRow。 |
kafka_row_delimiter | String | '\0' | 一般使用 '\n'。 |
kafka_schema | String | '' | protobuf 格式需要这个参数。 |
kafka_num_consumers | UInt64 | 1 | 消费者个数,每个消费者会创建一个线程。 |
kafka_max_block_size | UInt64 | 65536 | 写入block_size |
kafka_leader_priority | String | '0' | 会存储到zk上,互为主备的一对(组)消费者,仅leader_priority最小的会开启消费。其他节点的表不会消费。可被macro替换。 |
kafka_partition_num | String | '-1' | -1 表示使用动态分配(kafka subscribe API);
|
kafka_shard_count | String | '1' | 集群shard数,决定静态分配的分配规则。 |
kafka_auto_offset_reset | String | '' | 启动消费时或者数据过期时,offset的设置方式,可填:"earliest", "latest"。 |
extra_librdkafka_config | String | '' | JSON 形式;可以透传任何 librdkafka 支持的参数。请参考:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md。 |
Name | Default | Description |
---|---|---|
stream_flush_interval_ms | 7500 | ms, 每次消费batch的POLL数据的超时时间。 |
stream_poll_timeout_ms | 500 | ms, rdkafka poll 数据的等待时间,影响stop consume和超时判断。 |
kafka_session_timeout_ms | 180000 | ms, kafka session 超时时间,仅静态分配时会生效。 |
kafka_max_partition_fetch_bytes | 1048576 | bytes, 从 topic partition 拉去数据的最大大小,影响POLL数据的性能。 |
你可以通过控制面自动建导入任务,但若你需要手动建导入任务,则需要建一张 HaKafka 表,一张 Materialized View 物化视图,以及一张 HaMergeTree(或 HaUniqueMergeTree)的底表。以下展示了手动建表的示例。
CREATE TABLE TEST.CONSUMER_tce_service_resource_usage_local ON CLUSTER default ( `current_ts` Int64, `psm` String, `cpu_limit_pod` Float64 ) ENGINE = HaKafka('/clickhouse/experiment/TEST.CONSUMER_tce_service_resource_usage_local_233/{shard}', '{replica}') SETTINGS kafka_cluster = 'xxx_cluster', -- 替换成kafka集群名 kafka_topic_list = 'xxx_topic', -- 替换成业务方的topic kafka_group_name = 'xxx_group_name', -- 替换成自己想取的 group_name kafka_format = 'JSONEachRow', -- 一般用json kafka_row_delimiter = '\n', -- 一般是 \n
建好之后,可以直接从表中 SELECT 数据 (一般用来debug,不能在线上使用)
SELECT * FROM TEST.CONSUMER_tce_service_resource_usage_local LIMIT 3 FORMAT CSVWithNames; -- "current_ts","psm","cpu_limit_pod" -- 1567666740,"toutiao.video.user_packer",4 -- 1567666740,"ad.bi.datacore",4 -- 1567666740,"tce.sysprobe.probeagent",2
HaKafka 表定义了如何从 Kafka topic 中消费(解析)数据,为了将数据写入磁盘中,还需要建立一张 HaMergeTree 为引擎的目标表(或 HaUniqueMergeTree 为引擎),以及一张物化视图。
--- 准备一张存储数据的HaMergeTree表, CREATE TABLE TEST.tce_service_resource_usage_local ON CLUSTER default ( `current_ts` Int64, `psm` String, `cpu_limit_pod` Float64 ) ENGINE = HaMergeTree('/clickhouse/experiment/TEST.tce_service_resource_usage_local_666/{shard}', '{replica}') PARTITION BY toDate(current_ts) ORDER BY psm --- 再建一张物化视图,表示把数据从HaKafka表中SELECT出来写入到HaMergeTree表 CREATE MATERIALIZED VIEW TEST.VIEW_tce_service_resource_usage_local ON CLUSTER default TO TEST.tce_service_resource_usage_local AS SELECT current_ts, psm, cpu_limit_pod FROM TEST.CONSUMER_TEST_tce_service_resource_usage_local
三张表建好之后,每隔一段时间 (取决于 stream_flush_interval_ms 参数和数据持久化的时间),数据会写入目标表中。之后只需要查询目标表即可。
SQL 语法:
SYSTEM START/STOP/RESTART CONSUME <table_name>
语义:
开启/关闭/重启 HaKafka engine 的消费。将会改变HaKafka表的消费状态(ON/OFF),并持久化到磁盘,重启之后仍然保持原有状态。
通过如下语句可以修改 HaKafka 表设置,修改成功后会自动重启消费
ALTER TABLE db.table MODIFY SETTING <name1> = <value1>, <name2> = <value2>
有的业务方需要获取 Kafka 消息的元数据(e.g. 消息的partition, offset等)。我们可以使用 virtual columns 功能来满足这个需求。virtual columns 不需要在建表的时候指定,是表引擎本身的属性。在SELECT语句查询时可以显式选出 virtual columns(同样可以放到VIEW表的SELECT语句中):
-- SELECT _topic, -- String _partition, -- UInt64 _key, -- String _offset, -- UInt64 _content, -- String: 完整的消息内容 * -- 正常列可以通过*展开,虚拟列则不能 FROM TEST.CONSUMER_tce_service_resource_usage_local LIMIT 10