You need to enable JavaScript to run this app.
导航
消费者最佳实践
最近更新时间:2024.11.22 15:59:33首次发布时间:2024.11.22 15:59:33

本文档介绍使用火山引擎 BMQ 实例时的消费者最佳实践。

基本概念

BMQ 消费场景下,消费者与服务端交互过程如下图所示。

Image

什么是消费者

订阅并消费特定 Topic 数据的客户端进程被称为消费者(Consumer)。消费者并不是单独存在的,而是隶属于一个消费组(Consumer Group)。一个消费组可以包含多个消费者,它们共同并行地消费同一个Topic 的数据,每个消费者只负责处理部分分区的数据,这种设计允许 BMQ 在处理大规模数据流时,通过增加消费者数量来水平扩展,以提升消费能力。

什么是消费组

一个消费组中可包含多个消费者,隶属于同一个消费组中的消费者订阅或消费同一个 Topic,每个消费者根据分配策略,只消费一个 Topic 中的部分分区数据。例如 Topic A 有 4 个分区,消费组 ConsumerGroup A 中只有一个消费者 Consumer A1,则 Consumer A1 会消费 Topic A 中所有分区的数据;如果 ConsumerGroup A 中有 2 个消费者 Consumer A1 和 Consumer A2,则 Consumer A1 会消费 Topic A 中 2 个分区的数据,Consumer A2 会消费另外 2 个分区的数据。

什么是 Kafka 消费进度

每个消费组通过提交位点信息来跟踪其消费进度,客户端支持主动提交和定时提交 2 种位点提交机制,对于重复消费或者消费侧丢失消息较敏感的业务可以根据业务特性进行配置。
消费进度通常通过 Lag 来标识,计算公式为 Lag = MaxOffset(HW)- CommittedOffset,其中:

  • MaxOffset(HW): 分区中记录的最新位点。
  • CommittedOffset :消费组最后一次提交的位点,CommittedOffset 之前的数据为消费组已成功消费的数据。

当服务端不存在消费组提交的位点信息时,BMQ 客户端可以通过设置 auto.offset.reset 策略来指定开始消费的位置,支持从最早记录开始消费或者从最新记录开始消费。

什么是 HighLevel 消费

HighLevel 消费模型的消费协调工作由服务端 Coordinator 组件管理。在 HighLevel 消费模型中,消费者无需决策要消费的分区,服务端会根据客户端配置的 PARTITION_ASSIGNMENT_STRATEGY_CONFIG 参数来分配分区,该参数默认取值 RangeAssignor,默认将分区均匀的分配给组内所有消费者。

什么是 LowLevel 消费

LowLevel 消费模型的消费协调工作由消费者主动管理。LowLevel 消费模型通常适用于需要保持状态的业务服务,处于特定状态的服务需要消费固定的分区,在此场景下,消费者需要主动指定各自消费的分区,而无需服务端 Coordinator 组件介入。

HighLevel消费建议配置

配置建议

通用建议

  • 评估每条消息的处理时间:避免在处理消息时,同步执行各种延迟时间不可控的远程调用,或者复杂、耗时的资源处理。
  • 一个消费组只消费一个 Topic:BMQ 允许一个消费组消费多个 Topic,在此场景下,任何 Topic 数据的处理延迟都可能导致整个消费组的 Rebalance。因此,建议一对一消费,从而提升整体消费稳定性,并简化问题诊断过程。

参数配置建议

参数

说明

默认值

配置建议

session.timeout.ms

心跳超时时间,超过此时间没有收到消费者发送的心跳,服务端会触发消费组的 Rebalance。

30s

  • 根据实际业务场景或者消费能力适当调大。
  • 大于max.poll.records × 客户端每条消息处理时间)

max.poll.records

单次 poll 调用获取的最大消息数。

说明

客户端在调用 poll 方法时,会触发心跳线程,向服务端发送心跳请求。

500

根据每条消息的处理时长进行配置,从而控制客户端 poll 调用的频率。
建议设置为(session.timeout.ms × 客户端每秒可处理的消息条数)。

配置示例

//在控制台查看对应接入点信息
String server = "xxx.";
//在控制台申请的消息所属Topic
String topic = "this is your topic.";
//在控制台申请消费消息的consumerGroup
String group = "this is your group.";
//消费offset策略:earliest, latest, none
String offsetReset = "earliest";

Properties properties = new Properties();

// 必要参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// 根据也场景调优参数
// 若设置为false,则需要手动提交位点,即调用consumer.commitSync()方法。
// 默认值为true即自动提交位点,每隔5s会将当前消费的位点提交到Kafka。
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

// 心跳的间隔,即每隔多少ms会向服务端发送心跳,默认值为3s。
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

// 服务端如果在session.timeout.ms时间内没有收到心跳,则认为当前session过期,会触发rebalance。
// 建议适当调大此值,比如设置为60s。
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);

// 调用poll方法一次返回的最大记录数,默认值为500,建议调小此值比如100.
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);


// 用于指定两次poll的最大时间间隔,默认5分钟,如果超过了该间隔会触发rebalance。
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topicList = Lists.newArrayList(topic);
consumer.subscribe(topicList);
while (true) {
    try {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> record : records) {
            logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}",
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    record.key(),
                    record.value());
        }
    } catch (Exception e) {
        logger.error("consume error", e);
    }
}

LowLevel 消费建议配置

配置建议

  • 一个进程中启动一个消费者:建议在一个进程中运行单个消费者实例,以便在分析消费者行为或排查问题时,可通过 trace(跟踪)、dump(转储)等操作,获取相关对象状态或调用栈信息。
  • 避免长时间不调用 poll 方法:确保消费者定期调用 poll 方法,建议控制调用频率在秒级别内,避免因长时间未调用 poll 方法而导致消费者未能及时从 socket 缓存区读取服务端已返回的数据,从而引发服务端写入操作的阻塞。
  • max.poll.records 参数:单次 poll 调用获取的最大消息数量。减少消息数量可以缩短 poll 调用的时间间隔,促进网络层面的频繁交互。此参数默认取值为 500,您可根据实际业务场景或消费能力适当调小。

配置示例

//在控制台查看对应接入点信息
String server = "xxx.";
//在控制台申请的消息所属Topic
String topic = "this is your topic.";
//在控制台申请消费消息的consumerGroup
String group = "this is your group.";
//分区值partition
int partition = 0;

Properties properties = new Properties();

// 必要参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// 根据也场景调优参数
// 若设置为false,则需要手动提交位点,即调用consumer.commitSync()方法。
// 默认值为true即自动提交位点,每隔5s会将当前消费的位点提交到Kafka。
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

// 调用poll方法一次返回的最大记录数,默认值为500,建议调小此值比如100.
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

// 分配分区,不同的节点或者进程或者线程需要assign不同的分区
consumer.assign(Collections.singletonList(new TopicPartition(topic, partition)));

while (true) {
    try {
        // poll方法的延迟,不要在
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}",
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    record.key(),
                    record.value());
        }
    } catch (Exception e) {
        logger.error("consume error", e);
    }
}

常用参数说明

说明

以下参数的默认值基于 Java SDK 2.3.0 版本,在不同的开发语言或版本的 SDK 中可能会有所不同。

参数

默认值

参数说明

fetch.min.bytes

1 bytes

单次请求,服务端返回最小数据量。

fetch.max.wait.ms

500 ms

单次请求,服务端返回数据的最长等待时间。
fetch.max.wait.ms 参数与 fetch.min.bytes 参数需搭配使用,在消费者发起请求时,若服务端暂无可返回的数据,则服务端将自动进入等待返回状态并积累数据,积累到足量数据或者等待时间达最大等待时长时,直接返回数据。

fetch.max.bytes

50 M

单次请求服务端返回的最大数据量。

  • 在公网环境下建议适当调小此参数,公网环境可能存在带宽有限或者有其他流量占用带宽的情况,当服务端返回的数据量较大(超过 50MB)时,可能会触发限流,导致客户端断连等问题。
  • 需根据客户端的内存配置适当调整此参数,以减少在内存使用不受控情况下的请求次数,从而减轻客户端和服务端的负载。

max.parttion.fetch.bytes

1 M

服务端每个分区返回给消费者的最大数据量。
fetch.max.bytes 参数同时生效。

session.timeout.ms

30 s

心跳超时时间。
超过此时间没有收到消费者发送的心跳,服务端会触发消费组的 Rebalance。
可根据实际业务场景或者消费能力适当调大,建议大于max.poll.records × 每条消息处理时间),确保消费者在所设时间内可至少调用一次 poll 方法,触发心跳线程,发送心跳请求。

auto.offset.reset

latest

服务端无消费组提交的位点信息,或者消费者收到 outofrange 异常时,客户端的消费策略。取值如下:

  • latest:从最新的记录开始消费。
  • earliest:从最早记录开始消费。

注意

如果您的业务场景要求尽量避免丢失消息,可将此参数设置为 earliest

enable.auto.commit

true

客户端的消费位点提交机制,取值包括:

  • true:定时提交,每隔 5s 自动提交一次位点信息。
  • false:主动提交,您可根据业务场景,自定义位点提交逻辑,指定在消费完成时提交位点信息,或者提前提交位点信息,避免重复或消费侧数据丢失。

partition.asssignment.startegy

RangeAssingnor

分区分配策略,服务端将根据此参数设置进行分区分配,取值包括 RangeAssingnorRoundRobin,同时支持自定义分配策略。

client.id

客户端根据一定规则自动配置

客户端的唯一标识,用于服务端在记录日志和监控指标时标识特定的客户端。

max.poll.records

500

单次 poll 调用获取的最大消息数。根据每条消息的处理时长进行配置,从而控制客户端 poll 调用的频率。
建议设置为(session.timeout.ms × 客户端每秒可处理的消息条数)。

说明

客户端在调用 poll 方法时,会触发心跳线程,向服务端发送心跳请求。

receive.buffer.bytes

不涉及

Socket 在读取数据时使用的 TCP 缓冲区的大小。
当生产者或消费者与服务端处于不同的 Region,或者在跨可用区、跨云厂商使用场景下,可调大此参数。

send.buffer.bytes

常见问题

消费者频繁出现 Rebalance

  • 原因
    出现消费者 Rebalance 的原因通常为服务端在 session.time.out 时间内没有收到消费者发送的心跳,判定消费者已经离开消费组,触发 Rebalance。
  • 解决方案
    可通过调整以下参数来减少或者避免:
    • session.timeout.ms:适当调大此参数,调整逻辑为大于max.poll.records × 每条消息的处理时间),确保消费者在心跳超时时间内可至少调用一次 poll 方法,触发心跳线程,发送心跳。
    • max.poll.records:适当调小此参数,调整的逻辑为小于(session.timeout.ms × 客户端每秒可处理的消息条数)。

如何提高消费能力

如果消费能力跟不上消息生产速度,会导致 Lag 不断增加。
您可通过以下方式来调整消费能力:

  • 扩容消费实例个数
    消费实例个数最多可扩容至与分区数量相等。若已扩容到分区数,则通过增加分区数来进一步扩容消费实例。
  • 获取消息后异步处理
    消费速度慢的主要原因之一是数据处理能力不足,采用异步处理机制可以防止数据拉取过程被阻塞,请注意异步化处理本身并不会优化既有的数据处理逻辑。为了提升处理效率,可以通过增加异步处理的并发量来实现。
  • 获取消息后并发处理
    通常情况下,一次 poll 调用会获取多条消息,可以通过并发处理机制提升整体消费能力。
  • 提升依赖服务的处理能力
    如果处理数据会依赖其他下游的服务,可以通过提升依赖下游的处理能力来提升消费的能力。

公网拉取数据量过大导致连接断开

  • 原因
    fetch.max.bytes 的默认值为 50M,当消费者在公网消费时,若购买的公网带宽小于 50M,则可能会由于公网带宽的限制导致消费者与服务端的连接被断开。
  • 解决方案
    调整以下参数,限制消费者每次获取的数据量:
    • fetch.max.bytes:调整为小于公网带宽。在一些测试场景下,可以将该参数设置为更小的值,例如 10K(10240bytes)等。
    • max.partition.fetch.bytes:建议调整为 fetch.max.bytes 参数取值的 1/4。