消息队列 RocketMQ版提供 TCP 协议下的 RocketMQ 开源 Go SDK 的相关说明,本文档介绍收发定时消息和延时消息的示例代码。
如果发送消息到消息队列 RocketMQ版服务端后,不希望立即投递消息,可以使用定时或延时消息,根据消息中指定的属性延迟一定时间投递或指定时间点投递至消费端。其中,推迟到后续的某个指定时间再投递到消费端进行消费的消息为定时消息。推迟一定时间再投递到消费端进行消费的消息为延时消息,例如指定在消息发送时间的 30 分钟之后进行投递。
火山引擎消息队列 RocketMQ版提供了两种发送延时消息的方式,一种是特定延时时间,另一种是任意延时时间。您可以通过消息属性中的定时时间实现消息的定时发送,其中任意精度的延时消息包括以下两种。
特定精度延时消息,只支持特定的 18 个等级。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
任意精度延时消息:对于 2023年2月21日之后创建的 RocketMQ 实例,建议通过属性 __STARTDELIVERTIME
来使用任意精度的延时消息。
说明
开源 Apache RocketMQ SDK 支持延时消息,但不支持定时消息,因此没有专门的定时消息接口。
发送定时消息或延时消息之前,请确认已完成以下操作。
类别 | 注意事项 |
---|---|
延时时长 | 延迟消息支持自定义毫秒级延迟,延迟时长最长为 3 天或消息保留时长的 3 倍(两者取较小值)。 |
定时时间 |
|
时间精度 |
|
Exactly Only Once | 消息队列 RocketMQ版无法确保定时消息仅投递一次,某些定时消息可能会被重复投递。 |
存储空间 | 定时消息的存储资源消耗较大,同样的消息内容,定时消息的存储空间占用量约为普通消息的 3 倍,频繁使用定时消息时需要注意存储空间的占用情况。 |
RocketMQ 开源 Go SDK 发送定时消息或延时消息的示例代码如下。
package main import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2/producer" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { // 初始化生产者 p, err := rocketmq.NewProducer( producer.WithNameServer([]string{"http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876"}), producer.WithRetry(2), producer.WithCredentials(primitive.Credentials{ AccessKey: "ACCCESSKEY", SecretKey: "SECRETKEY", }), ) if err != nil { fmt.Println("init producer error:", err) return } // 启动生产者 if err := p.Start(); err != nil { fmt.Println("start producer error:", err) return } defer p.Shutdown() // 创建消息 msg := &primitive.Message{ Topic: "test", Body: []byte("Hello world"), } msg.WithTag("test-2") // 发送延时消息 delayTime := time.Now().Add(60*time.Second).UnixNano() / 1e6 msg.WithProperty("__STARTDELIVERTIME", fmt.Sprintf("%d", delayTime)) /** *若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2021-08-10 18:45:00投递。 *定时时间格式为:yyyy-MM-dd HH:mm:ss,若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。 * timeStamp := time.Date(2021, 8, 10, 18, 45, 0, 0, time.Local).UnixNano() / 1e6 * msg.WithProperty("__STARTDELIVERTIME", fmt.Sprintf("%d", timeStamp)) */ // 发送消息 res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Println("send message error:", err) return } fmt.Printf("send result: %+v\n", res) time.Sleep(30 * time.Second) // 如果需要取消之前延时的消息,再添加以下代码 // 创建取消消息对象 cancelMsg := &primitive.Message{ Topic: "test", Body: []byte("cancel"), } // 设置取消消息的时间戳,该时间戳必须与要取消的定时消息的定时时间戳一致 cancelMsg.WithProperty("__STARTDELIVERTIME", fmt.Sprintf("%d", delayTime)) // 设置要取消消息的ID,为发送消息的唯一ID(UNIQUE_KEY),可以从发送消息的结果中获取 cancelMsg.WithProperty("__CANCEL_SCHEDULED_MSG", res.MsgID) // 发送取消消息,必须在定时消息被投递之前发送才可以取消 cancelSendResult, err := p.SendSync(context.Background(), cancelMsg) if err != nil { fmt.Println("send cancel message error:", err) return } fmt.Printf("cancel send result: %+v\n", cancelSendResult) }