消息队列 RocketMQ版提供顺序消息(FIFO消息)供您使用。在顺序消息模型中,您需要严格按照顺序来发布和消费消息。本文提供使用 C++ SDK 收发顺序消息的示例代码供您参考。
顺序消息分为两类,全局顺序消息和分区顺序消息。区别仅为队列数量不同,代码没有区别。
MessageQueueSelector
回调函数来控制消息投递到哪个分区。发送顺序消息的示例代码如下。使用 g++ 命令进行编译。
g++ -o order_producer order_producer.cpp -lrocketmq -lpthread -lz -ldl -lrt
如果页面提示 undefined reference 相关的编译错误,请先确定是否已安装动态库,如果确定已安装,可以尝试在 g++ 命令添加 -D_GLIBCXX_USE_CXX11_ABI=0
参数重新尝试编译。
#include <iostream> #include <chrono> #include <thread> #include "rocketmq/DefaultMQProducer.h" using namespace std; using namespace rocketmq; class DefineSelectMessageQueue : public MessageQueueSelector { public: MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg) { // 实现自定义分区逻辑,根据业务传入arg参数即分区键,计算路由到哪个队列,这里以arg为int型参数为例。 int orderId = *static_cast<int *>(arg); int index = orderId % mqs.size(); return mqs[index]; } }; int main() { // 生产者名称无需申请 DefaultMQProducer producer("producer_group_name"); // 火山引擎的接入点 producer.setNamesrvAddr("accesspoint"); // AK、SK替换为RocketMQ实例的AccessKey和AccessKey Secret // 用户渠道,可以标明和用户相关即可,无需申请。 producer.setSessionCredentials("ak", "sk", "volc"); // 请确保参数设置完成之后启动Producer。 producer.start(); DefineSelectMessageQueue *queueSelector = new DefineSelectMessageQueue(); int count = 64; for (int i = 0; i < count; ++i) { // 配置消息写入的Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。 MQMessage msg("you_topic_name", "TAG", "msg content"); try { SendResult sendResult = producer.send(msg, queueSelector, &i, 3, false); std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl; this_thread::sleep_for(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } std::cout << "Send " << count << " messages OK, costs" << std::endl; producer.shutdown(); return 0; }
订阅顺序消息的示例代码如下。
g++ -o order_consumer order_consumer.cpp -lrocketmq -lpthread -lz -ldl -lrt
如果页面提示 undefined reference 相关的编译错误,请先确定是否已安装动态库,如果确定已安装,可以尝试在 g++ 命令添加 -D_GLIBCXX_USE_CXX11_ABI=0
参数重新尝试编译。
#include <iostream> #include <thread> #include "rocketmq/DefaultMQPushConsumer.h" using namespace rocketmq; class OrderlyMessageListener : public MessageListenerOrderly { public: ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) { for (auto item = msgs.begin(); item != msgs.end(); item++) { std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl; } return CONSUME_SUCCESS; } }; int main(int argc, char *argv[]) { // 您在火山引擎消息队列 RocketMQ控制台上申请的GID。 DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_group"); // 从火山引擎消息队列 RocketMQ控制台的实例详情页面获取。 consumer->setNamesrvAddr("your access point"); // AK、SK替换为RocketMQ实例的AccessKey和AccessKey Secret // 用户渠道,可以标明和用户相关即可,无需申请。 consumer->setSessionCredentials("ak", "sk", "VOLC"); // 请注册自定义监听函数用来处理接收到的消息,并返回响应的处理结果。 OrderlyMessageListener *messageListener = new OrderlyMessageListener(); consumer->subscribe("topic_name", "tag"); consumer->registerMessageListener(messageListener); // Start this consumer // 准备工作完成,必须调用启动函数,才可以正常工作。 // ******************************************** // 1.确保订阅关系的设置在启动之前完成。 // 2.确保相同GID下面的消费者的订阅关系一致。 // ********************************************* consumer->start(); // 保持主线程运行直到进程结束 std::this_thread::sleep_for(std::chrono::seconds(60)); consumer->shutdown(); return 0; }