You need to enable JavaScript to run this app.
导航
顺序消息
最近更新时间:2023.10.31 11:56:20首次发布时间:2023.05.12 15:10:40

消息队列 RocketMQ版提供顺序消息(FIFO消息)供您使用。在顺序消息模型中,您需要严格按照顺序来发布和消费消息。本文提供使用 Python SDK 收发顺序消息的示例代码供您参考。

背景信息

顺序消息分为两类,全局顺序消息和分区顺序消息。区别仅为队列数量不同,代码没有区别。

  • 全局顺序:
    对于指定的一个 Topic,所有消息的生产和消费需要遵循一定的顺序,消息的消费顺序必须和生产顺序一致,即需要严格的先入先出 FIFO(First In First Out)的顺序进行发布和消费。
  • 分区顺序:
    对于指定的一个 Topic,其中每一个分区的消息生产与消费是有序的,同一个队列内的消息按照严格的 FIFO 顺序进行发布和订阅。消息投递到哪一个分区由消息的 Sharding Key 来进行区分。在 SDK 中可以通过指定 Sharding Key 和 MessageQueueSelector 回调函数来控制消息投递到哪个分区。

前提条件

发送顺序消息

发送顺序消息的示例代码如下。

from rocketmq.client import Producer, Message
 
producer_group = ""  # 生产者group
name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876"  # 火山引擎控制台展示的TCP接入点
topic = ""  # 在火山引擎控制台Topic管理页面创建的topic名称
access_key = ""  # RocketMQ实例密钥管理页面获取到的AccessKey ID
access_secret = ""   # RocketMQ实例密钥管理页面获取到的AccessKey Secret
key = ""  # 消息key
tag = ""  # 消息tag

# 创建并启动生产者实例
producer = Producer(producer_group)
producer.set_name_server_address(name_server_addr)
producer.set_session_credentials(access_key, access_secret, "")
producer.start()

# 组装消息
msg = Message(topic)
msg.set_keys(key)
msg.set_tags(tag)
msg.set_body("hello volcano engine")
sharding_key = "key" # 指定消息投递的sharding key
ret = producer.send_orderly_with_sharding_key(msg, sharding_key) # 单向发送消息

print(ret.status, ret.msg_id, ret.offset)

# 关闭生产者实例,释放资源
producer.shutdown()

订阅顺序消息

订阅顺序消息的示例代码如下。

说明

订阅顺序消息之前,需要在创建 RocketMQ Consumer 的时候配置 ConsumerOrdertrue

import time

from rocketmq.client import PushConsumer, ConsumeStatus

name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876"  # 火山引擎控制台展示的TCP接入点
topic = ""  # 在火山引擎控制台Topic管理页面创建的topic名称
group = ""  # 在火山引擎控制台Group管理页面创建的group名称
access_key = ""  # RocketMQ实例密钥管理页面获取到的AccessKey ID
access_secret = ""  # RocketMQ实例密钥管理页面获取到的AccessKey Secret
tags = ""  # 消费消息匹配的tag,多个tag使用||进行分隔

def callback(msg):
    print(msg.id, msg.body)
    return ConsumeStatus.CONSUME_SUCCESS

consumer = PushConsumer(group, orderly=True) # 指定消费者为顺序消费类型
consumer.set_name_server_address(name_server_addr)
consumer.set_session_credentials(access_key, access_secret, "")

consumer.subscribe(topic, callback, tags)
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()