本文档介绍使用火山引擎 BMQ 实例时的消费者最佳实践。
BMQ 消费场景下,消费者与服务端交互过程如下图所示。
订阅并消费特定 Topic 数据的客户端进程被称为消费者(Consumer)。消费者并不是单独存在的,而是隶属于一个消费组(Consumer Group)。一个消费组可以包含多个消费者,它们共同并行地消费同一个Topic 的数据,每个消费者只负责处理部分分区的数据,这种设计允许 BMQ 在处理大规模数据流时,通过增加消费者数量来水平扩展,以提升消费能力。
一个消费组中可包含多个消费者,隶属于同一个消费组中的消费者订阅或消费同一个 Topic,每个消费者根据分配策略,只消费一个 Topic 中的部分分区数据。例如 Topic A 有 4 个分区,消费组 ConsumerGroup A 中只有一个消费者 Consumer A1,则 Consumer A1 会消费 Topic A 中所有分区的数据;如果 ConsumerGroup A 中有 2 个消费者 Consumer A1 和 Consumer A2,则 Consumer A1 会消费 Topic A 中 2 个分区的数据,Consumer A2 会消费另外 2 个分区的数据。
每个消费组通过提交位点信息来跟踪其消费进度,客户端支持主动提交和定时提交 2 种位点提交机制,对于重复消费或者消费侧丢失消息较敏感的业务可以根据业务特性进行配置。
消费进度通常通过 Lag
来标识,计算公式为 Lag = MaxOffset(HW)- CommittedOffset
,其中:
MaxOffset(HW)
: 分区中记录的最新位点。CommittedOffset
:消费组最后一次提交的位点,CommittedOffset
之前的数据为消费组已成功消费的数据。当服务端不存在消费组提交的位点信息时,BMQ 客户端可以通过设置 auto.offset.reset
策略来指定开始消费的位置,支持从最早记录开始消费或者从最新记录开始消费。
HighLevel 消费模型的消费协调工作由服务端 Coordinator 组件管理。在 HighLevel 消费模型中,消费者无需决策要消费的分区,服务端会根据客户端配置的 PARTITION_ASSIGNMENT_STRATEGY_CONFIG
参数来分配分区,该参数默认取值 RangeAssignor
,默认将分区均匀的分配给组内所有消费者。
LowLevel 消费模型的消费协调工作由消费者主动管理。LowLevel 消费模型通常适用于需要保持状态的业务服务,处于特定状态的服务需要消费固定的分区,在此场景下,消费者需要主动指定各自消费的分区,而无需服务端 Coordinator 组件介入。
参数 | 说明 | 默认值 | 配置建议 |
---|---|---|---|
session.timeout.ms | 心跳超时时间,超过此时间没有收到消费者发送的心跳,服务端会触发消费组的 Rebalance。 | 30s |
|
max.poll.records | 单次 poll 调用获取的最大消息数。 说明 客户端在调用 poll 方法时,会触发心跳线程,向服务端发送心跳请求。 | 500 | 根据每条消息的处理时长进行配置,从而控制客户端 poll 调用的频率。 |
//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //在控制台申请消费消息的consumerGroup String group = "this is your group."; //消费offset策略:earliest, latest, none String offsetReset = "earliest"; Properties properties = new Properties(); // 必要参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ConsumerConfig.GROUP_ID_CONFIG, group); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 根据也场景调优参数 // 若设置为false,则需要手动提交位点,即调用consumer.commitSync()方法。 // 默认值为true即自动提交位点,每隔5s会将当前消费的位点提交到Kafka。 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 心跳的间隔,即每隔多少ms会向服务端发送心跳,默认值为3s。 properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); // 服务端如果在session.timeout.ms时间内没有收到心跳,则认为当前session过期,会触发rebalance。 // 建议适当调大此值,比如设置为60s。 properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); // 调用poll方法一次返回的最大记录数,默认值为500,建议调小此值比如100. properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 用于指定两次poll的最大时间间隔,默认5分钟,如果超过了该间隔会触发rebalance。 properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); List<String> topicList = Lists.newArrayList(topic); consumer.subscribe(topicList); while (true) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } catch (Exception e) { logger.error("consume error", e); } }
max.poll.records
参数:单次 poll 调用获取的最大消息数量。减少消息数量可以缩短 poll 调用的时间间隔,促进网络层面的频繁交互。此参数默认取值为 500,您可根据实际业务场景或消费能力适当调小。//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //在控制台申请消费消息的consumerGroup String group = "this is your group."; //分区值partition int partition = 0; Properties properties = new Properties(); // 必要参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ConsumerConfig.GROUP_ID_CONFIG, group); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 根据也场景调优参数 // 若设置为false,则需要手动提交位点,即调用consumer.commitSync()方法。 // 默认值为true即自动提交位点,每隔5s会将当前消费的位点提交到Kafka。 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 调用poll方法一次返回的最大记录数,默认值为500,建议调小此值比如100. properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); // 分配分区,不同的节点或者进程或者线程需要assign不同的分区 consumer.assign(Collections.singletonList(new TopicPartition(topic, partition))); while (true) { try { // poll方法的延迟,不要在 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } catch (Exception e) { logger.error("consume error", e); } }
说明
以下参数的默认值基于 Java SDK 2.3.0 版本,在不同的开发语言或版本的 SDK 中可能会有所不同。
参数 | 默认值 | 参数说明 |
---|---|---|
fetch.min.bytes | 1 bytes | 单次请求,服务端返回最小数据量。 |
fetch.max.wait.ms | 500 ms | 单次请求,服务端返回数据的最长等待时间。 |
fetch.max.bytes | 50 M | 单次请求服务端返回的最大数据量。
|
max.parttion.fetch.bytes | 1 M | 服务端每个分区返回给消费者的最大数据量。 |
session.timeout.ms | 30 s | 心跳超时时间。 |
auto.offset.reset | latest | 服务端无消费组提交的位点信息,或者消费者收到 outofrange 异常时,客户端的消费策略。取值如下:
注意 如果您的业务场景要求尽量避免丢失消息,可将此参数设置为 |
enable.auto.commit | true | 客户端的消费位点提交机制,取值包括:
|
partition.asssignment.startegy | RangeAssingnor | 分区分配策略,服务端将根据此参数设置进行分区分配,取值包括 |
client.id | 客户端根据一定规则自动配置 | 客户端的唯一标识,用于服务端在记录日志和监控指标时标识特定的客户端。 |
max.poll.records | 500 | 单次 poll 调用获取的最大消息数。根据每条消息的处理时长进行配置,从而控制客户端 poll 调用的频率。 说明 客户端在调用 poll 方法时,会触发心跳线程,向服务端发送心跳请求。 |
receive.buffer.bytes | 不涉及 | Socket 在读取数据时使用的 TCP 缓冲区的大小。 |
send.buffer.bytes |
session.time.out
时间内没有收到消费者发送的心跳,判定消费者已经离开消费组,触发 Rebalance。session.timeout.ms
:适当调大此参数,调整逻辑为大于 (max.poll.records
× 每条消息的处理时间),确保消费者在心跳超时时间内可至少调用一次 poll 方法,触发心跳线程,发送心跳。max.poll.records
:适当调小此参数,调整的逻辑为小于(session.timeout.ms
× 客户端每秒可处理的消息条数)。如果消费能力跟不上消息生产速度,会导致 Lag
不断增加。
您可通过以下方式来调整消费能力:
fetch.max.bytes
的默认值为 50M,当消费者在公网消费时,若购买的公网带宽小于 50M,则可能会由于公网带宽的限制导致消费者与服务端的连接被断开。fetch.max.bytes
:调整为小于公网带宽。在一些测试场景下,可以将该参数设置为更小的值,例如 10K(10240bytes)等。max.partition.fetch.bytes
:建议调整为 fetch.max.bytes
参数取值的 1/4。