消息队列 RocketMQ版提供顺序消息(FIFO消息)供您使用。在顺序消息模型中,您需要严格按照顺序来发布和消费消息。本文提供使用 Go SDK 收发顺序消息的示例代码供您参考。
顺序消息分为两类,全局顺序消息和分区顺序消息。区别仅为队列数量不同,代码没有区别。
MessageQueueSelector
回调函数来控制消息投递到哪个分区。发送顺序消息的示例代码如下。
import ( "context" "fmt" "os" "strconv" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) func main() { // 这里按照业务逻辑来选择每个消息投递到哪个队列当中 queueSelector := producer.NewHashQueueSelector() p, _ := rocketmq.NewProducer( // 配置RocketMQ实例的接入点。 producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876"})), producer.WithRetry(2), // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。 producer.WithCredentials(primitive.Credentials{ AccessKey: "ACCCESSKEY", SecretKey: "SECRETKEY", }), // 配置使用的队列选择回调 producer.WithQueueSelector(queueSelector), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } // 配置Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 topic := "testTopic" for i := 0; i < 10; i++ { msg := &primitive.Message{ Topic: topic, Body: []byte("Hello World " + strconv.Itoa(i)), } // msg 指定分区key, 自定义即可 msg.WithShardingKey("key" + strconv.ItoA(i)) res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
订阅顺序消息的示例代码如下。
说明
订阅顺序消息之前,需要在创建 RocketMQ Consumer 的时候配置 ConsumerOrder
为 true
。
import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, _ := rocketmq.NewPushConsumer( //配置使用的Group ID。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 topic := "testTopic" consumer.WithGroupName("demo-group"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{ "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876", })), // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。 consumer.WithCredentials(primitive.Credentials{ AccessKey: "ACCESSKEY", SecretKey: "SECRETKEY", }), consumer.WithConsumerModel(consumer.Clustering), // 使用顺序消费的模式 consumer.WithConsumerOrder(true), ) err := c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { orderlyCtx, _ := primitive.GetOrderlyCtx(ctx) fmt.Printf("orderlyCtx: %#v\n", orderlyCtx) fmt.Printf("msgs: %v \n", msgs) return consumer.ConsumeSuccess, nil // 如果消费失败或者消费异常 // return consumer.SuspendCurrentQueueAMoment, err }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("Shutdown Consumer error: %s", err.Error()) } }