本文提供使用 C++ SDK 收发事务消息的示例代码供您参考。
通过以下步骤发送事务消息。
sendMessageInTransaction
发送消息到 RocketMQ 服务端。executeLocalTransaction
执行本地事务。checkLocalTransaction
。示例代码如下。
#include <iostream> #include <chrono> #include <thread> #include "rocketmq/TransactionMQProducer.h" #include "rocketmq/MQClientException.h" #include "rocketmq/TransactionListener.h" using namespace std; using namespace rocketmq; class DefineTransactionListener : public TransactionListener { public: LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg) { /* 执行本地事务 1. 成功返回COMMIT_MESSAGE 2. 失败返回ROLLBACK_MESSAGE 3. 不确定返回UNKNOWN。服务端会触发定时任务回查状态 */ std::cout << "Execute Local Transaction,Received Message Topic:" << msg.getTopic() << ", transactionId:" << msg.getTransactionId() << std::endl; return UNKNOWN; } LocalTransactionState checkLocalTransaction(const MQMessageExt &msg) { /* 回查本地事务执行情况 1. 成功返回COMMIT_MESSAGE 2. 失败返回ROLLBACK_MESSAGE 3. 不确定返回UNKNOWN。则等待下次定时任务回查。 */ std::cout << "Check Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:" << msg.getMsgId() << std::endl; return COMMIT_MESSAGE; } }; int main() { // 生产者名称无需申请 TransactionMQProducer producer("producer_group_name"); // 火山引擎的接入点 producer.setNamesrvAddr("accesspoint"); // AK、SK替换为RocketMQ实例的AccessKey和AccessKey Secret // 用户渠道,可以标明和用户相关即可,无需申请。 producer.setSessionCredentials("ak", "sk", "volc"); // 本地事务执行和回查函数。 DefineTransactionListener *exampleTransactionListener = new DefineTransactionListener(); producer.setTransactionListener(exampleTransactionListener); // 请确保参数设置完成之后启动Producer。 producer.start(); int count = 3; for (int i = 0; i < count; ++i) { // 配置消息写入的Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 MQMessage msg("TRANSACTION TOPIC", "TAG", "Transaction content"); try { SendResult sendResult = producer.sendMessageInTransaction(msg, &i); std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } std::cout << "Send " << count << " messages OK " << endl; std::cout << "Wait for local transaction check..... " << std::endl; for (int i = 0; i < 6; ++i) { this_thread::sleep_for(chrono::seconds(10)); std::cout << "Running " << i * 10 + 10 << " Seconds......" << std::endl; } producer.shutdown(); return 0; }
和订阅普通消息一致,请参考普通消息。