You need to enable JavaScript to run this app.
导航
RocketMQ 生产者使用建议
最近更新时间:2024.02.01 17:38:28首次发布时间:2023.06.01 14:51:22

本文档介绍 RocketMQ 生产者的使用建议,推荐在使用消息队列 RocketMQ版进行消息生产与消费之前,阅读以下使用建议,提高接入效率和业务稳定性。

消息 Tag

  • 建议组合使用 Topic 和 tags,以减少 Topic 的使用。
  • Tag 可以由应用自行设置。
  • 仅当生产者在发送消息时设置了 Tag,消费者在订阅消息时才可以利用 Tag 进行消息过滤,例如 message.setTags("TagA")。消费者在 Broker 侧根据 Tag 的 hashcode 进行初步过滤,在消费端根据字符串过滤。

消息 Key

每个消息在业务层面的唯一标识码要设置到 keys 字段,便于定位消息丢失等问题。消息队列 RocketMQ版服务端会为每个消息创建索引,您可以在控制台中通过 topic + key 来查询这条消息的内容,以及消息被谁消费。使用消息 Key 时,请注意:

  • 建议消息生产者为每条消息设置具有业务区分度的 Key。
  • 索引类型为哈希索引,所以务必保证 key 尽可能唯一,以确保以此避免潜在的哈希冲突。
  • 相同的 Key 的消息数量尽量少,最大不超过 64 条,否则消息查询结果不完整。

设置消息 Key 的方式请参考:

// 订单Id String orderId = "20034568923546";   
 message.setKeys(orderId);   

消息 ID

RocketMQ 发送消息返回的 SendResult 里面会有两个消息 ID,一个是 msgId,一个是 offsetMsgId。

  • msgId:客户端生成的唯一消息 ID,即便消息重发,消息 ID 也不会发生变化,一般可以作为唯一键用来消息去重。 msgId 生成规则主要包括客户端 IP、进程 ID、加载 MessageClientIDSetter 的类加载器的 hashcode、当前时间与系统启动时间的差值、自增序号等
  • offsetMsgId:Broker 生成的消息 ID,主要是记录的 Broker 的地址和消息的物理偏移量。不能保证唯一,消息重发就会导致相同的消息有不一样的 msgId。

日志打印

建议在消息发送成功或者失败时打印消息日志,日志中应包含 SendResult 和 Key 字段。可根据实际情况来选择是否打印消息体,如果消息内容比较重要,在消息发送失败时推荐打印消息体。

说明

对于发送结果为 SEND_OK 的消息,可以不打印消息日志,以免造成日志过多,浪费存储资源。

消息发送模式

目前消息队列 RocketMQ版提供了三种消息发送模式,说明如下:

说明

  • 异步发送和单向发送由于不需要等待返回结果就可以继续发送,消息的吞吐量会比较高,但是容易造成broker的发送线程池处理不过来,造成队列满了任务被拒绝。
  • 各种发送模式的完整实例代码可参考普通消息

发送模式

说明

示例

同步发送

Producer 发送消息后会等待服务端 broker 的返回结果,消息可靠性高,推荐使用这种模式。

SendResult sendResult = producer.send(msg)

异步发送

Producer 发送消息后,不会被阻塞,可以继续发送下一条消息。消息处理完毕从 Broker 返回结果后,直接回调 CallBack 函数进行逻辑处理。

producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {

    }

    @Override
    public void onException(Throwable e) {

    }
});

单向发送

Producer 只负责消息发送,不关心结果,Broker也不会返回处理结果。这种模式消息的可靠性比较低,一般用于日志处理,或者对消息可靠性要求不高的场景。

producer.sendOneway(msg);

发送成功状态

send 消息方法只要不抛出异常,就代表发送成功。发送成功会有多个状态,在 sendResult 里定义。每个状态的说明如下:

状态

说明

SEND_OK

消息发送成功。但是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步 Master 服务器或同步刷盘,即 SYNC_MASTER 或 SYNC_FLUSH。

FLUSH_DISK_TIMEOUT

刷盘超时。表示消息发送成功但是服务器刷盘超时。
此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果 Broker 服务器设置了刷盘方式为同步刷盘,即 FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当 Broker 服务器未在同步刷盘时间内(默认为 5s)完成刷盘,则将返回该状态。

FLUSH_SLAVE_TIMEOUT

数据同步到 Slave 服务器超时。表示消息发送成功,但是服务器同步到Slave时超时。
此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果 Broker 服务器的角色是同步 Master,即 SYNC_MASTER(默认是异步 Master 即 ASYNC_MASTER),并且从 Broker 服务器未在同步刷盘时间(默认为 5 秒)内完成与主服务器的同步,则将返回该状态。

SLAVE_NOT_AVAILABLE

无 Slave 服务器可用。表示消息发送成功,但是此时 Slave 不可用。如果 Broker 服务器的角色是同步 Master,即 SYNC_MASTER(默认是异步 Master 服务器即 ASYNC_MASTER),但没有配置 Slave Broker 服务器,则将返回该状态。

消息发送失败处理

Producer 的 send 方法本身支持内部重试,重试逻辑如下:

  • 同步发送:重试 2 次,总共发送 3 次。通过 retryTimesWhenSendFailed 参数可以配置重试逻辑。重试时会选择另一个 Broker master 进行重试。
  • 异步发送:重试 2 次,总共发送 3 次。通过 retryTimesWhenSendAsyncFailed 可配置重试次数。
  • 单向发送:不重试。

以上策略在一定程度上可保证消息成功发送。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑。例如调用 send 同步方法发送失败时,则尝试将消息存储到数据库,然后由后台线程定时重试,确保消息一定到达 Broker。

说明

如果服务端 Broker 返回的 response code 为 SYSTEM_BUSY,客户端会直接抛出异常。例如以下情况:

  • 消息存入 Broker pagecache 耗时超过 1000ms,导致消息快速失败被清理。
  • 消息在发送队列里面等待时间超过 waitTimeMillsInSendQueue(默认 1000ms)。
  • Broker pagecache busy,消息到达 Broker 直接被拒绝请求。
  • Broker 发送线程池队列任务已满,新任务被拒绝提交到线程池。

消息发送超时时间

Producer 默认的消息发送超时时间是 3000ms,可通过 sendMsgTimeout 配置。在生产环境,不建议将超时时间配置的很短,因为 RocketMQ 只能保证 P99 的延迟在几毫秒以内,部分毛刺的时间可能会比较大。如果时间配置较短,容易导致消息发送失败。