You need to enable JavaScript to run this app.
导航
Kafka/BMQ
最近更新时间:2024.10.23 19:54:09首次发布时间:2022.09.08 17:27:42

Kafka 连接器提供从 Kafka Topic 或 BMQ Topic 中消费和写入数据的能力,支持做数据源表和结果表。您可以创建 source 流从 Kafka Topic 中获取数据,作为作业的输入数据;也可以通过 Kafka 结果表将作业输出数据写入到 Kafka Topic 中。

注意事项

  • 使用 Flink SQL 的用户需要注意,不再支持 kafka-0.10kafka-0.11 两个版本的连接器,请直接使用 kafka 连接器访问 Kafka 0.10 和 0.11 集群。
    Kafka-0.10 和 Kafka-0.11 两个版本的连接器使用的 Kafka 客户端有缺陷,在某些情况下可能无法自动提交 Kafka offset 信息。
  • 使用 datastream API 开发的用户需要注意,在读 Kafka 消息的时候,不要使用 FlinkKafkaConsumer010FlinkKafkaConsumer011 两个 consumer,请直接使用 FlinkKafkaConsumer 进行开发;在往 Kafka 写消息的时候,不要使用 FlinkKafkaProducer010FlinkKafkaProducer011 两个 producer,请直接使用 FlinkKafkaProducer 进行开发。

DDL 定义

用作数据源(Source)

CREATE TABLE kafka_source (
    name String,
    score INT
 ) WITH (
     'connector' = 'kafka',
     'topic' = 'test_topic_01',
     'properties.bootstrap.servers' = 'localhost:9092',
     'properties.group.id' = 'test_topic_01',
     'format' = 'csv',
     'scan.startup.mode' = 'earliest-offset'
 );

用作数据目的(Sink)

CREATE TABLE kafka_sink (
    name String,
    score INT
 ) WITH (
     'connector' = 'kafka',
     'topic' = 'test_topic_01',
     'properties.bootstrap.servers' = 'localhost:9092',
     'format' = 'csv'
 );

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处仅支持 Kafka 连接器。

注意

Kafka-0.10 和 Kafka-0.11 两个版本的连接器使用的 Kafka 客户端有缺陷,在某些情况下可能无法自动提交 Kafka offset 信息。

topic

(none)

String

指定 Kafka Topic 的名称。

properties.bootstrap.servers

(none)

String

指定 Kafka Broker 的地址,格式为host:port

properties.group.id

(none)

String

指定 Kafka 消费组的 ID。

注意

在 Flink 中使用 Kafka 连接器消费 BMQ 消息时,需要提前在 BMQ 平台侧创建 Consumer Group。
如果没有提前创建 Group,任务可以正常运行,但不能正常提交 Offset。

properties.batch.size

16

string

单个 Partition 对应的 Batch 中支持写入的最大字节数,默认值为 16 KB。

  • batch.size=单个 Producer 的消息QPS * 消息大小 * liner.ms/ Partition 数
  • 提升 batch.size 的值,一个 Batch 能写入更多数据,可以提升吞吐量。但是 batch.size 也不能设置太大,以免出现 Batch 迟迟写不满,导致发送消息延迟高。
  • 一般与 properties.linger.msproperties.buffer.memory 参数联合使用,满足任意一个条件都会立即发送消息。

说明

如果在写 Kafka 数据时出现吞吐量不足,建议您提升 batch.size 取值,一般设置为 128KB。

properties.linger.ms

0

string

消息在 Batch 中的停留时间,即发送消息前的等待时长。默认为 0 毫秒,表示“立即发送消息”。

  • 可以适当提升 linger.ms 取值,以引入小延迟为代价,提高吞吐量和压缩率。
  • 该参数一般与 properties.batch.sizeproperties.buffer.memory 参数联合使用,满足任意一个条件都会立即发送消息。

说明

如果在写 Kafka 数据时出现吞吐量不足,建议您提升 linger.ms 取值,一般设置为 100ms。

properties.buffer.memory

32M

string

缓存消息的总可用 Memory 空间,如果 Memory 用完,则会立即发送缓存中的消息。

  • 设置时,建议按照计算公式设置:buffer.memory>=batch.size * partition数*2。
  • 该参数一般与 properties.batch.sizeproperties.linger.ms 参数联合使用,满足任意一个条件都会立即发送消息。

说明

如果 buffer.memory 较小,可能会造成 Batch 失效,从而导致 QPS 升高被下游限流等问题。

properties.enable.idempotence

true

Boolean

是否启用 Kafka 连接器的幂等性。默认为 true,表示启用幂等性。
启用幂等属性后,在面对 Client 重试引起的消息重复时,系统的反应与处理一次的请求相同,能够确保消息的顺序和完整性。

注意

如果您通过 Kafka 连接器连接 BMQ 资源,且使用 Flink 1.16-volcano 引擎版本,那么必须将properties.enable.idempotence参数设置为 false 以关闭幂等,否则任务会运行失败。

scan.topic-partition-discovery.interval

none

Duration

在 Kafka/BMQ 动态扩容的场景下,用于定期扫描并发现新的 Topic 和 Partition 的时间间隔,推荐设置为 120s。

注意

默认值是 none,代表不开启。建议您在任务中添加该参数配置,设置动态检测的时间间隔。
如果任务中不配置该参数,将不会动态发现分区。此时新增分区,将无法读取到新增分区中的数据。

format

(none)

String

用来反序列化 Kafka 消息体(value)时使用的格式。支持的格式如下:

  • CSV
  • JSON
  • Debezium-JSON
  • Canal-JSON
  • Maxwell-JSON
  • Ogg-JSON
  • Avro
  • Confluent Avro
  • RAW

scan.startup.mode

group-offsets

String

读取数据时的启动模式。 取值如下:

  • earliest-offset:从 Kafka 最早分区开始读取。
  • latest-offset:从 Kafka 最新位点开始读取。
  • group-offsets:默认值,根据 Group 读取。
  • timestamp:从 Kafka 指定时间点读取。需要在 WITH 参数中指定 scan.startup.timestamp-millis 参数。
  • specific-offsets:从 Kafka 指定分区目标偏移量读取。需要在 WITH 参数中指定 scan.startup.specific-offsets 参数。

scan.startup.specific-offsets

(none)

String

specific-offsets 启动模式下,指定每个分区的启动偏移量。如partition:0,offset:42;partition:1,offset:300

scan.startup.timestamp-millis

(none)

Long

timestamp 启动模式下,指定启动位点时间戳,单位毫秒。

scan.parallelism

(none)

Integer

单独设置 Source 并发。如果不设置,则并行度为作业的默认并发数。
该参数经常用于 Source 和下游算子需要断开算子链的场景,使得下游重计算的算子能使用较大的默认并发,提高计算能力,同时保持 Source 并发和 Kafka 分区数相等,此时 Source 到下游由于并发不同,数据 Shuffle 是均匀的,从而提高了整体计算速率。

sink.partitioner

fixed

String

Flink 分区到 Kafka 分区的映射关系。取值如下:

  • fixed(默认值):每个 Flink 分区对应一个 Kafka 分区。
  • round-robin:Flink 分区中的数据将被轮流分配至 Kafka 的各个分区。
  • 自定义映射模式:支持创建一个 FlinkKafkaPartitioner 的子类来自定义分区映射模式。例如org.mycompany.MyPartitioner

sink.parallelism

(none)

Integer

单独设置 Kafka Sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。

自动提交 Offsets

目前一般使用以下两种方式自动提交 Kafka Offsets。

  • 方式 1:依赖 Flink 任务 Checkpoint。
    Flink 任务开启 Checkpoint 时,Kafka Source 在完成 Checkpoint 时会提交当前的消费位点,以保证 Flink 的 Checkpoint 状态和 Kafka Broker 上的提交位点一致。

    注意

    依赖 Flink 任务 Checkpoint 来管理 Kafka Offsets 时,如果上游数据量很大,很可能会触发上游的 LAG 告警。

  • 方式 2:依赖 Kafka Consumer 的位点定时提交逻辑。
    当 Flink 任务没有开启 Checkpoint 时,Kafka Source 将依赖 Kafka Consumer 的位点定时提交逻辑。您可以通过设置enable.auto.commitauto.commit.interval.ms两个参数来控制位点定时自动提交。
    -- 是否自动提交 Offsets。取值为 true 表示自动提交 Offsets;取值为 false,表示手动同步或异步提交。
      'enable.auto.commit' = 'true',
      -- 自动提交 Offsets 的时间间隔,单位为 ms。
      'auto.commit.interval.ms' = '500',
    

安全与认证

如果 Kafka 集群要求安全连接或认证,您需要在 WITH 参数中通过 properties.前缀添加安全认证相关配置。
示例 1:使用 SASL_PLAINTEXT 安全协议,SASL 机制为 PLAIN 。

CREATE TABLE KafkaTable (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  ...
  -- 配置安全协议为 SASL_PLAINTEXT。
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  -- 配置 SASL 机制为  PLAIN。
  'properties.sasl.mechanism' = 'PLAIN',
  -- 配置 JAAS。
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="yourname" password="yourpassword";'
)

示例 2:使用 SASL_SSL 安全协议, SASL 机制为 SASL_SSL,并配置证书和密钥信息。

CREATE TABLE KafkaTable (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  ...
  -- 配置安全协议为 SASL_SSL。
  'properties.security.protocol' = 'SASL_SSL',
  -- 配置服务端提供的 truststore (CA 证书) 的路径和密码。
  'properties.ssl.truststore.location' = '/path/to/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  -- 如果要求客户端认证,则需要配置 keystore (私钥) 的路径和密码。
  'properties.ssl.keystore.location' = '/path/to/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  --配置 SASL 机制为 SCRAM-SHA-256。 
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  -- 配置 JAAS。
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="yourname" password="yourpassword";'
)

示例代码

  • 源表示例

    CREATE TABLE kafka_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp
         )
    WITH (
      'connector' = 'kafka',
      'topic' = 'source',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'group_01',  -- 消费BMQ消息时,需要提前在BMQ平台侧创建Group,否则不能正常提交Offset。
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
    );
    CREATE TABLE print_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp
        )
    WITH (
         'connector' = 'print'           
    );
    insert into print_sink
    select * from kafka_source;
    
  • 结果表示例

    CREATE TABLE datagen_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time as localtimestamp
        )
    WITH (
         'connector' = 'datagen',
         'rows-per-second' = '5'          
    );
    
    CREATE TABLE kafka_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp
         )
    WITH (
      'connector' = 'kafka',
      'topic' = 'sink',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'group_01',
      'format' = 'json'
    );
    
    insert into kafka_sink
    select * from datagen_source;
    

FAQ

丢状态重启

如果修改了作业的拓扑结构,新增了一些算子,则不能从历史状态恢复,此时可以丢状态,并从 timestamp 或者 group-offsets 模式启动:

  • 从 timestamp 时间戳恢复

适用于之前的作业没开过 checkpoint。需要从数据曲线-业务延迟监控里,找到当前消费延迟,反推出当前消费的时间点,从这个时间点恢复即可。
比如业务延迟是 1h,当前时刻是 12 点,则反推出已消费时间点是 11 点,为了避免丢数,则可以根据情况往前多消费一点,比如往前 30min,那么要设置的恢复的时刻就是 10:30。
参数如下:

'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = 'xxxx' -- 毫秒时间戳,可以从 https://tool.chinaz.com/tools/unixtime.aspx 获取

  • 从 group offsets 恢复

适用于之前的作业开过 checkpoint,并完成过 checkpoint。由于 checkpoint 时,kafka source 会向 kafka 实例提交 offset,因此 offset 会保存一份到 kafka 实例。
此时需要把 kafka source 里的 startup mode 改为 group-offsets,参数如下:

'scan.startup.mode' = 'group-offsets'