You need to enable JavaScript to run this app.
导航
事务消息
最近更新时间:2023.10.31 11:56:19首次发布时间:2022.01.28 15:16:40

本文提供使用 Go SDK 收发事务消息的示例代码供您参考。

前提条件

发送事务消息

通过以下步骤发送事务消息。

  1. 业务侧通过 SendMessageInTransaction 发送消息到 RocketMQ 服务端。
  2. 业务侧通过 ExecuteLocalTransaction 执行本地事务。
  3. 实现业务查询事务执行是否成功的接口 CheckLocalTransaction。

示例代码如下。

import (
   "context"
   "fmt"
   "os"
   "strconv"
   "sync"
   "sync/atomic"
   "time"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "github.com/apache/rocketmq-client-go/v2/producer"
)


// 这里模拟了一个内存状态的事务执行,实际需要更换成相应的数据库等事务操作
type DemoListener struct {
   localTrans       *sync.Map
   transactionIndex int32
}

func NewDemoListener() *DemoListener {
   return &DemoListener{
      localTrans: new(sync.Map),
   }
}

// 这里是执行本地事务逻辑的方法回调
func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
   nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
   fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
   status := nextIndex % 3
   dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1))

   fmt.Printf("dl")
   return primitive.UnknowState
}

// 这里broker会通知客户端进行查询本地事务是否执行成功了,实际情况需要查询数据库当中的信息是否真正执行成功了
func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
   fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
   v, existed := dl.localTrans.Load(msg.TransactionId)
   if !existed {
      fmt.Printf("unknow msg: %v, return Commit", msg)
      return primitive.CommitMessageState
   }
   
   // 这里实际对应业务从数据库中查询消息的事务状态   
   
   state := v.(primitive.LocalTransactionState)
   switch state {
   case 1:
      fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg)
      return primitive.CommitMessageState
   case 2:
      fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg)
      return primitive.RollbackMessageState
   case 3:
      fmt.Printf("checkLocalTransaction unknow: %v\n", msg)
      return primitive.UnknowState
   default:
      fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
      return primitive.CommitMessageState
   }
}

func main() {
   p, _ := rocketmq.NewTransactionProducer(
      NewDemoListener(),
      // 实例接入点信息
      producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-cnoea09856be\*\*\*\*.rocketmq.volces.com:9876"})),
      producer.WithRetry(2),
      
      // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。
      producer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCCESSKEY",
          SecretKey: "SECRETKEY",
      }),
      
      // 同一个处理单元里面的group应该是相同的,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
      producer.WithGroupName("demo"),
   )
   err := p.Start()
   if err != nil {
      fmt.Printf("start producer error: %s\n", err.Error())
      os.Exit(1)
   }

   for i := 0; i < 10; i++ {
      // 使用发送事务消息的方式来处理,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
      res, err := p.SendMessageInTransaction(context.Background(),
         primitive.NewMessage("testTopic", []byte("Hello World"+strconv.Itoa(i))))

      if err != nil {
         fmt.Printf("send message error: %s\n", err)
      } else {
         fmt.Printf("send message success: result=%s\n", res.String())
      }
   }
   
   
   // 防止客户端进程退出,业务自定义处理即可
   time.Sleep(5 * time.Minute)
   err = p.Shutdown()
   if err != nil {
      fmt.Printf("shutdown producer error: %s", err.Error())
   }
}

订阅事务消息

事务消息的订阅方式与普通消息一致,示例代码如下所示。

集群模式消费

集群模式消费消息的示例代码如下。

import (
   "context"
   "fmt"
   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/consumer"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "os"
)

func main() {
   c, _ := rocketmq.NewPushConsumer(
      // 配置创建的实例信息,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
      consumer.WithGroupName("demo-group"),
      consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{
         "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876",
      })),
      // 配置使用的密钥信息
      consumer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCESS KEY",
          SecretKey: "SECRET KEY",
      }),
   )
   
   // 处理消息的回调。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
   err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx context.Context,
      msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
      for i := range msgs {
         fmt.Printf("subscribe callback: %v \n", msgs[i])
      }
      // 如果消息处理成功则返回consumer.ConsumeSuccess
      // 如果消息处理失败则返回consumer.ConsumeRetryLater
      return consumer.ConsumeSuccess, nil
   })
   if err != nil {
      fmt.Println(err.Error())
   }
   // Note: start after subscribe
   err = c.Start()
   if err != nil {
      fmt.Println(err.Error())
      os.Exit(-1)
   }

   waitChan := make(chan interface{}, 0)
   <-waitChan

}

广播模式消费

广播模式消费的示例代码如下。

import (
   "context"
   "fmt"
   "os"
   "time"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/consumer"
   "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
   c, _ := rocketmq.NewPushConsumer(
      // 配置使用的group信息,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
      consumer.WithGroupName("demo-group"),
      // 配置使用的实例接入点
      consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{
         "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876",
      })),
      // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。
      consumer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCESS KEY",
          SecretKey: "SECRET KEY",
      }),
      // 配置使用的消费模型:配置为集群模式
      consumer.WithConsumerModel(consumer.BroadCasting),
   )
   
   // 配置处理消息的回调,对于实例 ID 格式为 MQ_INST_xxxx 的实例,其中topic的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
   err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx context.Context,
      msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
      fmt.Printf("subscribe callback: %v \n", msgs)
      return consumer.ConsumeSuccess, nil
   })
   if err != nil {
      fmt.Println(err.Error())
   }
   // Note: start after subscribe
   err = c.Start()
   if err != nil {
      fmt.Println(err.Error())
      os.Exit(-1)
   }
   time.Sleep(time.Hour)
   err = c.Shutdown()
   if err != nil {
      fmt.Printf("Shutdown Consumer error: %s", err.Error())
   }
}