本文档介绍 RocketMQ 生产者的使用建议,推荐在使用消息队列 RocketMQ版进行消息生产与消费之前,阅读以下使用建议,提高接入效率和业务稳定性。
message.setTags("TagA")
。消费者在 Broker 侧根据 Tag 的 hashcode 进行初步过滤,在消费端根据字符串过滤。每个消息在业务层面的唯一标识码要设置到 keys 字段,便于定位消息丢失等问题。消息队列 RocketMQ版服务端会为每个消息创建索引,您可以在控制台中通过 topic + key 来查询这条消息的内容,以及消息被谁消费。使用消息 Key 时,请注意:
设置消息 Key 的方式请参考:
// 订单Id String orderId = "20034568923546"; message.setKeys(orderId);
RocketMQ 发送消息返回的 SendResult 里面会有两个消息 ID,一个是 msgId,一个是 offsetMsgId。
建议在消息发送成功或者失败时打印消息日志,日志中应包含 SendResult 和 Key 字段。可根据实际情况来选择是否打印消息体,如果消息内容比较重要,在消息发送失败时推荐打印消息体。
说明
对于发送结果为 SEND_OK 的消息,可以不打印消息日志,以免造成日志过多,浪费存储资源。
目前消息队列 RocketMQ版提供了三种消息发送模式,说明如下:
说明
发送模式 | 说明 | 示例 |
---|---|---|
同步发送 | Producer 发送消息后会等待服务端 broker 的返回结果,消息可靠性高,推荐使用这种模式。 |
|
异步发送 | Producer 发送消息后,不会被阻塞,可以继续发送下一条消息。消息处理完毕从 Broker 返回结果后,直接回调 CallBack 函数进行逻辑处理。 |
|
单向发送 | Producer 只负责消息发送,不关心结果,Broker也不会返回处理结果。这种模式消息的可靠性比较低,一般用于日志处理,或者对消息可靠性要求不高的场景。 |
|
send 消息方法只要不抛出异常,就代表发送成功。发送成功会有多个状态,在 sendResult 里定义。每个状态的说明如下:
状态 | 说明 |
---|---|
SEND_OK | 消息发送成功。但是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步 Master 服务器或同步刷盘,即 SYNC_MASTER 或 SYNC_FLUSH。 |
FLUSH_DISK_TIMEOUT | 刷盘超时。表示消息发送成功但是服务器刷盘超时。 |
FLUSH_SLAVE_TIMEOUT | 数据同步到 Slave 服务器超时。表示消息发送成功,但是服务器同步到Slave时超时。 |
SLAVE_NOT_AVAILABLE | 无 Slave 服务器可用。表示消息发送成功,但是此时 Slave 不可用。如果 Broker 服务器的角色是同步 Master,即 SYNC_MASTER(默认是异步 Master 服务器即 ASYNC_MASTER),但没有配置 Slave Broker 服务器,则将返回该状态。 |
Producer 的 send 方法本身支持内部重试,重试逻辑如下:
以上策略在一定程度上可保证消息成功发送。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑。例如调用 send 同步方法发送失败时,则尝试将消息存储到数据库,然后由后台线程定时重试,确保消息一定到达 Broker。
说明
如果服务端 Broker 返回的 response code 为 SYSTEM_BUSY,客户端会直接抛出异常。例如以下情况:
Producer 默认的消息发送超时时间是 3000ms,可通过 sendMsgTimeout 配置。在生产环境,不建议将超时时间配置的很短,因为 RocketMQ 只能保证 P99 的延迟在几毫秒以内,部分毛刺的时间可能会比较大。如果时间配置较短,容易导致消息发送失败。