本文提供使用 Go SDK 收发事务消息的示例代码供您参考。
通过以下步骤发送事务消息。
示例代码如下。
import ( "context" "fmt" "os" "strconv" "sync" "sync/atomic" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) // 这里模拟了一个内存状态的事务执行,实际需要更换成相应的数据库等事务操作 type DemoListener struct { localTrans *sync.Map transactionIndex int32 } func NewDemoListener() *DemoListener { return &DemoListener{ localTrans: new(sync.Map), } } // 这里是执行本地事务逻辑的方法回调 func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState { nextIndex := atomic.AddInt32(&dl.transactionIndex, 1) fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId) status := nextIndex % 3 dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1)) fmt.Printf("dl") return primitive.UnknowState } // 这里broker会通知客户端进行查询本地事务是否执行成功了,实际情况需要查询数据库当中的信息是否真正执行成功了 func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState { fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId) v, existed := dl.localTrans.Load(msg.TransactionId) if !existed { fmt.Printf("unknow msg: %v, return Commit", msg) return primitive.CommitMessageState } // 这里实际对应业务从数据库中查询消息的事务状态 state := v.(primitive.LocalTransactionState) switch state { case 1: fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg) return primitive.CommitMessageState case 2: fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg) return primitive.RollbackMessageState case 3: fmt.Printf("checkLocalTransaction unknow: %v\n", msg) return primitive.UnknowState default: fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg) return primitive.CommitMessageState } } func main() { p, _ := rocketmq.NewTransactionProducer( NewDemoListener(), // 实例接入点信息 producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-cnoea09856be\*\*\*\*.rocketmq.volces.com:9876"})), producer.WithRetry(2), // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。 producer.WithCredentials(primitive.Credentials{ AccessKey: "ACCCESSKEY", SecretKey: "SECRETKEY", }), // 同一个处理单元里面的group应该是相同的,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 producer.WithGroupName("demo"), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s\n", err.Error()) os.Exit(1) } for i := 0; i < 10; i++ { // 使用发送事务消息的方式来处理,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("testTopic", []byte("Hello World"+strconv.Itoa(i)))) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } } // 防止客户端进程退出,业务自定义处理即可 time.Sleep(5 * time.Minute) err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
事务消息的订阅方式与普通消息一致,示例代码如下所示。
集群模式消费消息的示例代码如下。
import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" "os" ) func main() { c, _ := rocketmq.NewPushConsumer( // 配置创建的实例信息,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 consumer.WithGroupName("demo-group"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{ "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876", })), // 配置使用的密钥信息 consumer.WithCredentials(primitive.Credentials{ AccessKey: "ACCESS KEY", SecretKey: "SECRET KEY", }), ) // 处理消息的回调。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range msgs { fmt.Printf("subscribe callback: %v \n", msgs[i]) } // 如果消息处理成功则返回consumer.ConsumeSuccess // 如果消息处理失败则返回consumer.ConsumeRetryLater return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } waitChan := make(chan interface{}, 0) <-waitChan }
广播模式消费的示例代码如下。
import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, _ := rocketmq.NewPushConsumer( // 配置使用的group信息,对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 consumer.WithGroupName("demo-group"), // 配置使用的实例接入点 consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{ "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876", })), // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。 consumer.WithCredentials(primitive.Credentials{ AccessKey: "ACCESS KEY", SecretKey: "SECRET KEY", }), // 配置使用的消费模型:配置为集群模式 consumer.WithConsumerModel(consumer.BroadCasting), ) // 配置处理消息的回调,对于实例 ID 格式为 MQ_INST_xxxx 的实例,其中topic的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { fmt.Printf("subscribe callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("Shutdown Consumer error: %s", err.Error()) } }