Kafka 连接器提供从 Kafka Topic 或 BMQ Topic 中消费和写入数据的能力,支持做数据源表和结果表。您可以创建 source 流从 Kafka Topic 中获取数据,作为作业的输入数据;也可以通过 Kafka 结果表将作业输出数据写入到 Kafka Topic 中。
CREATE TABLE kafka_source ( name String, score INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_topic_01', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'test_topic_01', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' );
CREATE TABLE kafka_sink ( name String, score INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_topic_01', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'csv' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处仅支持 Kafka 连接器。 注意 Kafka-0.10 和 Kafka-0.11 两个版本的连接器使用的 Kafka 客户端有缺陷,在某些情况下可能无法自动提交 Kafka offset 信息。 |
topic | 是 | (none) | String | 指定 Kafka Topic 的名称。 |
properties.bootstrap.servers | 是 | (none) | String | 指定 Kafka Broker 的地址,格式为 |
properties.group.id | 是 | (none) | String | 指定 Kafka 消费组的 ID。 注意 在 Flink 中使用 Kafka 连接器消费 BMQ 消息时,需要提前在 BMQ 平台侧创建 Consumer Group。 |
properties.batch.size | 否 | 16 | string | 单个 Partition 对应的 Batch 中支持写入的最大字节数,默认值为 16 KB。
说明 如果在写 Kafka 数据时出现吞吐量不足,建议您提升 batch.size 取值,一般设置为 128KB。 |
properties.linger.ms | 否 | 0 | string | 消息在 Batch 中的停留时间,即发送消息前的等待时长。默认为 0 毫秒,表示“立即发送消息”。
说明 如果在写 Kafka 数据时出现吞吐量不足,建议您提升 linger.ms 取值,一般设置为 100ms。 |
properties.buffer.memory | 否 | 32M | string | 缓存消息的总可用 Memory 空间,如果 Memory 用完,则会立即发送缓存中的消息。
说明 如果 buffer.memory 较小,可能会造成 Batch 失效,从而导致 QPS 升高被下游限流等问题。 |
properties.enable.idempotence | 否 | true | Boolean | 是否启用 Kafka 连接器的幂等性。默认为 true,表示启用幂等性。 注意 如果您通过 Kafka 连接器连接 BMQ 资源,且使用 Flink 1.16-volcano 引擎版本,那么必须将 |
scan.topic-partition-discovery.interval | 否 | none | Duration | 在 Kafka/BMQ 动态扩容的场景下,用于定期扫描并发现新的 Topic 和 Partition 的时间间隔,推荐设置为 120s。 注意 默认值是 none,代表不开启。建议您在任务中添加该参数配置,设置动态检测的时间间隔。 |
format | 是 | (none) | String | 用来反序列化 Kafka 消息体(value)时使用的格式。支持的格式如下:
|
scan.startup.mode | 否 | group-offsets | String | 读取数据时的启动模式。 取值如下:
|
scan.startup.specific-offsets | 否 | (none) | String | 在 specific-offsets 启动模式下,指定每个分区的启动偏移量。如 |
scan.startup.timestamp-millis | 否 | (none) | Long | 在 timestamp 启动模式下,指定启动位点时间戳,单位毫秒。 |
scan.parallelism | 否 | (none) | Integer | 单独设置 Source 并发。如果不设置,则并行度为作业的默认并发数。 |
sink.partitioner | 否 | fixed | String | Flink 分区到 Kafka 分区的映射关系。取值如下:
|
sink.parallelism | 否 | (none) | Integer | 单独设置 Kafka Sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。 |
目前一般使用以下两种方式自动提交 Kafka Offsets。
注意
依赖 Flink 任务 Checkpoint 来管理 Kafka Offsets 时,如果上游数据量很大,很可能会触发上游的 LAG 告警。
enable.auto.commit
和auto.commit.interval.ms
两个参数来控制位点定时自动提交。-- 是否自动提交 Offsets。取值为 true 表示自动提交 Offsets;取值为 false,表示手动同步或异步提交。 'enable.auto.commit' = 'true', -- 自动提交 Offsets 的时间间隔,单位为 ms。 'auto.commit.interval.ms' = '500',
如果 Kafka 集群要求安全连接或认证,您需要在 WITH 参数中通过 properties.
前缀添加安全认证相关配置。
示例 1:使用 SASL_PLAINTEXT 安全协议,SASL 机制为 PLAIN 。
CREATE TABLE KafkaTable ( user_id BIGINT, item_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', ... -- 配置安全协议为 SASL_PLAINTEXT。 'properties.security.protocol' = 'SASL_PLAINTEXT', -- 配置 SASL 机制为 PLAIN。 'properties.sasl.mechanism' = 'PLAIN', -- 配置 JAAS。 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="yourname" password="yourpassword";' )
示例 2:使用 SASL_SSL 安全协议, SASL 机制为 SASL_SSL,并配置证书和密钥信息。
CREATE TABLE KafkaTable ( user_id BIGINT, item_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', ... -- 配置安全协议为 SASL_SSL。 'properties.security.protocol' = 'SASL_SSL', -- 配置服务端提供的 truststore (CA 证书) 的路径和密码。 'properties.ssl.truststore.location' = '/path/to/kafka.client.truststore.jks', 'properties.ssl.truststore.password' = 'test1234', -- 如果要求客户端认证,则需要配置 keystore (私钥) 的路径和密码。 'properties.ssl.keystore.location' = '/path/to/kafka.client.keystore.jks', 'properties.ssl.keystore.password' = 'test1234', --配置 SASL 机制为 SCRAM-SHA-256。 'properties.sasl.mechanism' = 'SCRAM-SHA-256', -- 配置 JAAS。 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="yourname" password="yourpassword";' )
源表示例
CREATE TABLE kafka_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'kafka', 'topic' = 'source', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'group_01', -- 消费BMQ消息时,需要提前在BMQ平台侧创建Group,否则不能正常提交Offset。 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); CREATE TABLE print_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'print' ); insert into print_sink select * from kafka_source;
结果表示例
CREATE TABLE datagen_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5' ); CREATE TABLE kafka_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'kafka', 'topic' = 'sink', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'group_01', 'format' = 'json' ); insert into kafka_sink select * from datagen_source;
如果修改了作业的拓扑结构,新增了一些算子,则不能从历史状态恢复,此时可以丢状态,并从 timestamp 或者 group-offsets 模式启动:
适用于之前的作业没开过 checkpoint。需要从数据曲线-业务延迟监控里,找到当前消费延迟,反推出当前消费的时间点,从这个时间点恢复即可。
比如业务延迟是 1h,当前时刻是 12 点,则反推出已消费时间点是 11 点,为了避免丢数,则可以根据情况往前多消费一点,比如往前 30min,那么要设置的恢复的时刻就是 10:30。
参数如下:
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = 'xxxx' -- 毫秒时间戳,可以从 https://tool.chinaz.com/tools/unixtime.aspx 获取
适用于之前的作业开过 checkpoint,并完成过 checkpoint。由于 checkpoint 时,kafka source 会向 kafka 实例提交 offset,因此 offset 会保存一份到 kafka 实例。
此时需要把 kafka source 里的 startup mode 改为 group-offsets,参数如下:
'scan.startup.mode' = 'group-offsets'