You need to enable JavaScript to run this app.
导航
定时消息和延时消息
最近更新时间:2023.11.21 19:52:02首次发布时间:2023.08.18 15:50:09

消息队列 RocketMQ版提供 TCP 协议下的 RocketMQ 开源 Go SDK 的相关说明,本文档介绍收发定时消息和延时消息的示例代码。

背景信息

如果发送消息到消息队列 RocketMQ版服务端后,不希望立即投递消息,可以使用定时或延时消息,根据消息中指定的属性延迟一定时间投递或指定时间点投递至消费端。其中,推迟到后续的某个指定时间再投递到消费端进行消费的消息为定时消息。推迟一定时间再投递到消费端进行消费的消息为延时消息,例如指定在消息发送时间的 30 分钟之后进行投递。
火山引擎消息队列 RocketMQ版提供了两种发送延时消息的方式,一种是特定延时时间,另一种是任意延时时间。您可以通过消息属性中的定时时间实现消息的定时发送,其中任意精度的延时消息包括以下两种。

  • 特定精度延时消息,只支持特定的 18 个等级。

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    
  • 任意精度延时消息:对于 2023年2月21日之后创建的 RocketMQ 实例,建议通过属性 __STARTDELIVERTIME 来使用任意精度的延时消息。

说明

开源 Apache RocketMQ SDK 支持延时消息,但不支持定时消息,因此没有专门的定时消息接口。

前提条件

发送定时消息或延时消息之前,请确认已完成以下操作。

  • 创建 Go 项目,并创建 RocketMQ 实例等资源。详细操作请参考准备工作
  • 下载并安装开源 Apache RocketMQ Go SDK。
  • 2023年03月20日之前创建的 RocketMQ 实例,如需使用定时或延时消息,请通过工单系统联系技术支持升级实例版本。

注意事项

类别

注意事项

延时时长

延迟消息支持自定义毫秒级延迟,延迟时长最长为 3 天或消息保留时长的 3 倍(两者取较小值)。

定时时间

  • 定时消息的定时时间即服务端向消费者发起投递的时间,以服务端时间为准。如果客户端和服务端时间存在时间差,消息的实际投递时间可能与与客户端设置的投递时间不一致。
  • 如果消费者存在消息堆积现象,服务端优先处理堆积消息,队列中的定时消息会被排在堆积消息之后,此时无法严格按照客户端配置的定时时间进行投递。
  • 定时消息的消息发送时间必须指定为当前时间点之后的某个时刻,否则此消息将被立刻投递给消费者。
  • 定时消息同样受消息保存时长的限制。例如,指定服务端 1 天后开始投递定时消息,如果此消息在消息保存时长(3 天)之内未被消费者消费,那么这条消息将在第 4 天被删除。

时间精度

  • 通常情况下,客户端指定的定时消息发送时间与实际服务端发送消息的时间误差在 0.1s 以内。但如果定时消息数量或长度较大,服务端投递任务繁忙,可能会触发定时消息投递的流控机制,此时无法保证 0.1s 的时间误差。
  • 在 0.1s 的时间精度内,消息队列 RocketMQ版无法保证消息投递的顺序性。即如果客户端未两条定时消息指定的定时时间差距小于 0.1s,无法保证严格按照消息发送的顺序投递消息。

Exactly Only Once

消息队列 RocketMQ版无法确保定时消息仅投递一次,某些定时消息可能会被重复投递。

存储空间

定时消息的存储资源消耗较大,同样的消息内容,定时消息的存储空间占用量约为普通消息的 3 倍,频繁使用定时消息时需要注意存储空间的占用情况。

发送定时或延时消息

RocketMQ 开源 Go SDK 发送定时消息或延时消息的示例代码如下。

package main

import (
   "context"
   "fmt"
   "github.com/apache/rocketmq-client-go/v2/producer"
   "time"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
   // 初始化生产者
   p, err := rocketmq.NewProducer(
      producer.WithNameServer([]string{"http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876"}),
      producer.WithRetry(2),
      producer.WithCredentials(primitive.Credentials{
         AccessKey: "ACCCESSKEY",
         SecretKey: "SECRETKEY",
      }),
   )
   if err != nil {
      fmt.Println("init producer error:", err)
      return
   }

   // 启动生产者
   if err := p.Start(); err != nil {
      fmt.Println("start producer error:", err)
      return
   }
   defer p.Shutdown()

   // 创建消息
   msg := &primitive.Message{
      Topic: "test",
      Body:  []byte("Hello world"),
   }
   msg.WithTag("test-2")
   // 发送延时消息
   delayTime := time.Now().Add(60*time.Second).UnixNano() / 1e6
   msg.WithProperty("__STARTDELIVERTIME", fmt.Sprintf("%d", delayTime))

   /**
    *若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2021-08-10 18:45:00投递。
    *定时时间格式为:yyyy-MM-dd HH:mm:ss,若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。
    * timeStamp := time.Date(2021, 8, 10, 18, 45, 0, 0, time.Local).UnixNano() / 1e6
    * msg.WithProperty("__STARTDELIVERTIME", fmt.Sprintf("%d", timeStamp))
    */

   // 发送消息
   res, err := p.SendSync(context.Background(), msg)
   if err != nil {
      fmt.Println("send message error:", err)
      return
   }
   fmt.Printf("send result: %+v\n", res)
   time.Sleep(30 * time.Second)
   // 如果需要取消之前延时的消息,再添加以下代码
   // 创建取消消息对象
   cancelMsg := &primitive.Message{
      Topic: "test",
      Body:  []byte("cancel"),
   }
   // 设置取消消息的时间戳,该时间戳必须与要取消的定时消息的定时时间戳一致
   cancelMsg.WithProperty("__STARTDELIVERTIME", fmt.Sprintf("%d", delayTime))
   // 设置要取消消息的ID,为发送消息的唯一ID(UNIQUE_KEY),可以从发送消息的结果中获取
   cancelMsg.WithProperty("__CANCEL_SCHEDULED_MSG", res.MsgID)
   // 发送取消消息,必须在定时消息被投递之前发送才可以取消
   cancelSendResult, err := p.SendSync(context.Background(), cancelMsg)
   if err != nil {
      fmt.Println("send cancel message error:", err)
      return
   }
   fmt.Printf("cancel send result: %+v\n", cancelSendResult)
}