You need to enable JavaScript to run this app.
消息队列 RocketMQ版

消息队列 RocketMQ版

复制全文
Go SDK
事务消息
复制全文
事务消息

本文提供使用 Go SDK 收发事务消息的示例代码供您参考。

前提条件

发送事务消息

通过以下步骤发送事务消息。

  1. 业务侧通过 SendMessageInTransaction 发送消息到 RocketMQ 服务端。
  2. 业务侧通过 ExecuteLocalTransaction 执行本地事务。
  3. 实现业务查询事务执行是否成功的接口 CheckLocalTransaction。

示例代码如下。

import (
   "context"
   "fmt"
   "os"
   "strconv"
   "sync"
   "sync/atomic"
   "time"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "github.com/apache/rocketmq-client-go/v2/producer"
)


// 这里模拟了一个内存状态的事务执行,实际需要更换成相应的数据库等事务操作
type DemoListener struct {
   localTrans       *sync.Map
   transactionIndex int32
}

func NewDemoListener() *DemoListener {
   return &DemoListener{
      localTrans: new(sync.Map),
   }
}

// 这里是执行本地事务逻辑的方法回调
func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
   nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
   fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
   status := nextIndex % 3
   dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1))

   fmt.Printf("dl")
   return primitive.UnknowState
}

// 这里broker会通知客户端进行查询本地事务是否执行成功了,实际情况需要查询数据库当中的信息是否真正执行成功了
func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
   fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
   v, existed := dl.localTrans.Load(msg.TransactionId)
   if !existed {
      fmt.Printf("unknow msg: %v, return Commit", msg)
      return primitive.CommitMessageState
   }
   
   // 这里实际对应业务从数据库中查询消息的事务状态   
   
   state := v.(primitive.LocalTransactionState)
   switch state {
   case 1:
      fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg)
      return primitive.CommitMessageState
   case 2:
      fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg)
      return primitive.RollbackMessageState
   case 3:
      fmt.Printf("checkLocalTransaction unknow: %v\n", msg)
      return primitive.UnknowState
   default:
      fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
      return primitive.CommitMessageState
   }
}

func main() {
   p, _ := rocketmq.NewTransactionProducer(
      NewDemoListener(),
      // 实例接入点信息
      producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-cnoea09856be\*\*\*\*.rocketmq.volces.com:9876"})),
      producer.WithRetry(2),
      
      // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。
      producer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCCESSKEY",
          SecretKey: "SECRETKEY",
      }),
      
      // 同一个处理单元里面的group应该是相同的,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
      producer.WithGroupName("demo"),
   )
   err := p.Start()
   if err != nil {
      fmt.Printf("start producer error: %s\n", err.Error())
      os.Exit(1)
   }

   for i := 0; i < 10; i++ {
      // 使用发送事务消息的方式来处理,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
      res, err := p.SendMessageInTransaction(context.Background(),
         primitive.NewMessage("testTopic", []byte("Hello World"+strconv.Itoa(i))))

      if err != nil {
         fmt.Printf("send message error: %s\n", err)
      } else {
         fmt.Printf("send message success: result=%s\n", res.String())
      }
   }
   
   
   // 防止客户端进程退出,业务自定义处理即可
   time.Sleep(5 * time.Minute)
   err = p.Shutdown()
   if err != nil {
      fmt.Printf("shutdown producer error: %s", err.Error())
   }
}

订阅事务消息

事务消息的订阅方式与普通消息一致,示例代码如下所示。

集群模式消费

集群模式消费消息的示例代码如下。

import (
   "context"
   "fmt"
   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/consumer"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "os"
)

func main() {
   c, _ := rocketmq.NewPushConsumer(
      // 配置创建的实例信息,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
      consumer.WithGroupName("demo-group"),
      consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{
         "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876",
      })),
      // 配置使用的密钥信息
      consumer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCESS KEY",
          SecretKey: "SECRET KEY",
      }),
   )
   
   // 处理消息的回调。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
   err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx context.Context,
      msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
      for i := range msgs {
         fmt.Printf("subscribe callback: %v \n", msgs[i])
      }
      // 如果消息处理成功则返回consumer.ConsumeSuccess
      // 如果消息处理失败则返回consumer.ConsumeRetryLater
      return consumer.ConsumeSuccess, nil
   })
   if err != nil {
      fmt.Println(err.Error())
   }
   // Note: start after subscribe
   err = c.Start()
   if err != nil {
      fmt.Println(err.Error())
      os.Exit(-1)
   }

   waitChan := make(chan interface{}, 0)
   <-waitChan

}

广播模式消费

广播模式消费的示例代码如下。

import (
   "context"
   "fmt"
   "os"
   "time"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/consumer"
   "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
   c, _ := rocketmq.NewPushConsumer(
      // 配置使用的group信息,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
      consumer.WithGroupName("demo-group"),
      // 配置使用的实例接入点
      consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{
         "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876",
      })),
      // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。
      consumer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCESS KEY",
          SecretKey: "SECRET KEY",
      }),
      // 配置使用的消费模型:配置为集群模式
      consumer.WithConsumerModel(consumer.BroadCasting),
   )
   
   // 配置处理消息的回调,对于实例 ID 格式为 MQ_INST_xxxx 的实例,其中topic的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
   err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx context.Context,
      msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
      fmt.Printf("subscribe callback: %v \n", msgs)
      return consumer.ConsumeSuccess, nil
   })
   if err != nil {
      fmt.Println(err.Error())
   }
   // Note: start after subscribe
   err = c.Start()
   if err != nil {
      fmt.Println(err.Error())
      os.Exit(-1)
   }
   time.Sleep(time.Hour)
   err = c.Shutdown()
   if err != nil {
      fmt.Printf("Shutdown Consumer error: %s", err.Error())
   }
}
最近更新时间:2023.10.31 11:56:19
这个页面对您有帮助吗?
有用
有用
无用
无用