消息队列 RocketMQ版提供顺序消息(FIFO消息)供您使用。在顺序消息模型中,您需要严格按照顺序来发布和消费消息。本文提供使用 Python SDK 收发顺序消息的示例代码供您参考。
顺序消息分为两类,全局顺序消息和分区顺序消息。区别仅为队列数量不同,代码没有区别。
MessageQueueSelector
回调函数来控制消息投递到哪个分区。发送顺序消息的示例代码如下。
from rocketmq.client import Producer, Message producer_group = "" # 生产者group name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876" # 火山引擎控制台展示的TCP接入点 topic = "" # 在火山引擎控制台Topic管理页面创建的topic名称 access_key = "" # RocketMQ实例密钥管理页面获取到的AccessKey ID access_secret = "" # RocketMQ实例密钥管理页面获取到的AccessKey Secret key = "" # 消息key tag = "" # 消息tag # 创建并启动生产者实例 producer = Producer(producer_group) producer.set_name_server_address(name_server_addr) producer.set_session_credentials(access_key, access_secret, "") producer.start() # 组装消息 msg = Message(topic) msg.set_keys(key) msg.set_tags(tag) msg.set_body("hello volcano engine") sharding_key = "key" # 指定消息投递的sharding key ret = producer.send_orderly_with_sharding_key(msg, sharding_key) # 单向发送消息 print(ret.status, ret.msg_id, ret.offset) # 关闭生产者实例,释放资源 producer.shutdown()
订阅顺序消息的示例代码如下。
说明
订阅顺序消息之前,需要在创建 RocketMQ Consumer 的时候配置 ConsumerOrder
为 true
。
import time from rocketmq.client import PushConsumer, ConsumeStatus name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876" # 火山引擎控制台展示的TCP接入点 topic = "" # 在火山引擎控制台Topic管理页面创建的topic名称 group = "" # 在火山引擎控制台Group管理页面创建的group名称 access_key = "" # RocketMQ实例密钥管理页面获取到的AccessKey ID access_secret = "" # RocketMQ实例密钥管理页面获取到的AccessKey Secret tags = "" # 消费消息匹配的tag,多个tag使用||进行分隔 def callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESS consumer = PushConsumer(group, orderly=True) # 指定消费者为顺序消费类型 consumer.set_name_server_address(name_server_addr) consumer.set_session_credentials(access_key, access_secret, "") consumer.subscribe(topic, callback, tags) consumer.start() while True: time.sleep(3600) consumer.shutdown()