消息队列 RocketMQ版提供 TCP 协议下的 RocketMQ 开源 Java SDK 的相关说明,本文档介绍收发定时消息和延时消息的示例代码。
如果发送消息到消息队列 RocketMQ版服务端后,不希望立即投递消息,可以使用定时或延时消息,根据消息中指定的属性延迟一定时间投递或指定时间点投递至消费端。其中,推迟到后续的某个指定时间再投递到消费端进行消费的消息为定时消息。推迟一定时间再投递到消费端进行消费的消息为延时消息,例如指定在消息发送时间的 30 分钟之后进行投递。
火山引擎消息队列 RocketMQ版提供了两种发送延时消息的方式,一种是特定延时时间,另一种是任意延时时间。您可以通过消息属性中的定时时间实现消息的定时发送,其中任意精度的延时消息包括以下两种。
特定精度延时消息,只支持特定的 18 个等级。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
任意精度延时消息:对于 2023年2月21日之后创建的 RocketMQ 实例,建议通过属性 __STARTDELIVERTIME
来使用任意精度的延时消息。
说明
开源 Apache RocketMQ SDK 支持延时消息,但不支持定时消息,因此没有专门的定时消息接口。
发送定时消息或延时消息之前,请确认已完成以下操作。
类别 | 注意事项 |
---|---|
延时时长 | 延迟消息支持自定义毫秒级延迟,延迟时长最长为 3 天或消息保留时长的 3 倍(两者取较小值)。 |
定时时间 |
|
时间精度 |
|
Exactly Only Once | 消息队列 RocketMQ版无法确保定时消息仅投递一次,某些定时消息可能会被重复投递。 |
存储空间 | 定时消息的存储资源消耗较大,同样的消息内容,定时消息的存储空间占用量约为普通消息的 3 倍,频繁使用定时消息时需要注意存储空间的占用情况。 |
消息批处理(Batch) | 使用批量方式发送的消息暂不支持设置定时或延迟能力。 |
RocketMQ 开源 Java SDK 发送定时消息或延时消息的示例代码如下。
try { Message msg = new Message("YOUR TOPIC", /*设置消息的Tag。*/ "YOUR MESSAGE TAG", /*消息内容。*/ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); /*发送延时消息,需要设置延时时间,单位毫秒(ms),消息将在指定延时时间后投递,例如消息将在3秒后投递。*/ long delayTime = System.currentTimeMillis() + 3000; msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime)); /** *若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2021-08-10 18:45:00投递。 *定时时间格式为:yyyy-MM-dd HH:mm:ss,若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。 * long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime(); * msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp)); */ SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); // 如果需要取消之前延时的消息,再添加以下代码 // 创建取消消息对象 Message cancelMsg = new Message("YOUR TOPIC", "", "", "cancel".getBytes(StandardCharsets.UTF_8)); // 设置取消消息的时间戳,该时间戳必须与要取消的定时消息的定时时间戳一致 cancelMsg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime)); // 设置要取消消息的ID,为发送消息的唯一ID(UNIQUE_KEY),可以从发送消息的结果中获取 cancelMsg.putUserProperty("__CANCEL_SCHEDULED_MSG", sendResult.getMsgId()); // 发送取消消息,必须在定时消息被投递之前发送才可以取消,发给指定的 broker node queue 节点,没有 broker 信息的情况下,需要发给所有 broker 节点 SendResult cancelSendResult = producer.send(cancelMsg, sendResult.getMessageQueue()); } catch (Exception e) { //消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); }