You need to enable JavaScript to run this app.
导航
RocketMQ 消费者使用建议
最近更新时间:2023.06.01 14:51:22首次发布时间:2023.06.01 14:51:22

本文档介绍 RocketMQ 消费者的使用建议,推荐在使用消息队列 RocketMQ版进行消息生产与消费之前,阅读以下使用建议,提高接入效率和业务稳定性。

消费过程幂等

RocketMQ 无法保证消息只被消费一次(Exactly-Once),即无法避免消息重复,主要由于以下原因:

  • 消息发送失败时会重试
  • 消费者批量消费,消费进度上报时回上报最小的 offset。
  • 支持重置消费进度
    如果业务对消费重复非常敏感,务必要在业务层面进行去重处理,例如借助关系数据库进行去重。此时需要确定消息的唯一键,可以是 msgId,也可以是消息内容中的唯一标识字段,例如订单 ID 等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过。
    msgId 一定是全局唯一标识符。但是实际使用中,可能会存在相同的消息有两个不同 msgId 的情况。消费者主动重发、因客户端重投机制导致的重复等,这种情况就需要使业务字段进行重复消费。

消费速率慢处理

RocketMQ 的每个消费者的能力不同,线上经常会发生消费堆积的问题。您可以通过以下方式处理:

提高消费并行度

绝大部分消息消费行为都属于 IO 密集型,即操作数据库或者调用 RPC。这类消费行为的消费速度依赖于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 您可以通过以下方式修改消费并行度:

  • 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度。
  • 如果消费者订阅的 Topic 队列数过少,可以对 Topic 的队列数扩容,来提高消费的并行度,避免因消费者实例大于队列数,导致部分消费者空转。
  • 通过修改参数 consumeThreadMin、consumeThreadMax 提高单个 Consumer 的消费并行线程。由于 Consumer 端的消费线程池队列是无界队列,所以这两个值推荐设置为相同的数值,避免 coreMax 不生效。

批量方式消费

某些业务流程如果支持批量方式消费,可以一定程度上提高消费吞吐量。例如订单扣款类应用中,一次处理一个订单耗时 1s,一次处理 10 个订单可能也只耗时 2s,这样即可大幅度提高消费的吞吐量。可通过 consumer 的 consumeMessageBatchMaxSize 参数设置批量消费,该参数默认为 1,即一次只消费一条消息,如果设置为 N,那么每次消费的消息数小于等于 N。

说明

某些业务通常通过批量方式获取消息,即只取第一条消息,此时默认情况下没有问题。如果设置了 consumeMessageBatchMaxSize,listener 的回调返回的 msgs 是多条消息,需要遍历处理,或者直接批量处理。

重置消费进度

如果业务的堆积消息过多,在预期时间内无法完成消费,而且经过评估消息不是很重要。这种情况下可以选择消费进度重置,跳过不重要的消息。重置消费进度的操作步骤请参考重置消费位点

业务消费逻辑优化

如果经由以上方式调试后,消费速率仍未提升,则需要排查是否由于业务消费逻辑慢导致消费速率不高。

  • 通过查看消费组的客户端监控,观察 consumeRT 指标,看消息的消费平均延迟有多大。
  • 观察消费者实例,查看本地缓存的消息数,判断是否触发了流控限制。

消费暂停处理

消费者消费消息时,会提交到线程池去消费,如果消费突然暂停,查看消费进度时发现进度一直没有任何更新。通常是业务逻辑处理等原因阻塞了消费进程。您可以在消费者所在的机器上执行 jstack pid 查看线程状态,观察是否有 Blocked 状态。还可以多打印几次不同时间点的 jstack 状态,观察消费线程池的状态是否一直为 running 状态,这种有时也是不正常的消费状态。
解决方式:

  • 重启消费客户端可暂时解决该现象。
  • 长期来看,需要和业务方一起排查下业务逻辑问题需要设置消费者消费的超时时间。例如消费写入 ES,因为 ES 集群不可用,导致消费写入 ES 的线程一直处于 Running 状态,但实际上是消费暂停状态。

消费组订阅一致性

订阅关系一致指的是同一个消费者 Group ID 下所有 Consumer 实例所订阅的 Topic、Tag 必须完全一致。如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

  • 订阅的 Topic 必须一致,例如 Consumer1 订阅 TopicA 和 TopicB,Consumer2 也必须订阅 TopicA 和 TopicB,不能只订阅 TopicA、只订阅 TopicB 或订阅 TopicA和 TopicC。
  • 订阅的同一个 Topic 中的 Tag 必须一致,包括Tag的数量和 Tag 的顺序,例如:Consumer1 订阅 TopicB 且 Tag 为 Tag1||Tag2,Consumer2 订阅 TopicB 的 Tag 也必须是 Tag1||Tag2,不能只订阅 Tag1、只订阅 Tag2 或者订阅 Tag2||Tag1。
  • 同一个消费组下面的消费者实例的 clientId 不能相同。
    Q:为什么订阅不一致会导致消息丢失?
    A:假如一个 ConsumerGroup 下面有 2 个消费者实例,consumer1 订阅了 TopicA,consuemr2 订阅了 TopicB。那么对于 consumer 来说,客户端 rebalance 时发现这个 consumerGroup 有 2 个实例,就会对 TopicA 下面的队列进行均分,consumer1 分得一半的队列进行消费(这部分消费没问题),另一半队列分配给 consumer2,但是 consumer2 没有订阅 TopicA,导致这部分队列被过滤掉了,最终造成的结果时一半的队列没有被消费。

重置消费位点

当新建一个 consumerGroup 时,需要决定是否消费之前的历史消息,可以通过设置起始消费位点来配置。目前提供了三种重置消费位点的方式:

  • CONSUME_FROM_LAST_OFFSET:(默认方式)从最新的位点开始消费,之前的消息会被忽略。
  • CONSUME_FROM_FIRST_OFFSET:从还存在broker中的消息中最开始的位点开始消费。
  • CONSUME_FROM_TIMESTAMP:从指定的时间戳开始消费。

说明

某些情况下,即使设置了从 last offset 开始消费,但是发现消费者还是消费到了之前的消息。这种主要是出现在Topic 是最近创建的,消息还没有被清理过的场景。如果不想消费之前的消息,可以设置从指定时间戳开始消费。