本文提供使用Python SDK收发事务消息的示例代码供您参考。
通过以下步骤发送事务消息。
示例代码如下。
import time from rocketmq.client import TransactionMQProducer, Message, TransactionStatus producer_group = "" # 生产者group name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876" # 火山引擎控制台展示的TCP接入点 topic = "" # 在火山引擎控制台Topic管理页面创建的topic名称 access_key = "" # RocketMQ实例密钥管理页面获取到的AccessKey ID access_secret = "" # RocketMQ实例密钥管理页面获取到的AccessKey Secret key = "" # 消息key tag = "" # 消息tag def transaction_checker_callback(msg, user_args): return TransactionStatus.COMMIT def transaction_local_execute(msg, user_args): return TransactionStatus.UNKNOWN # 创建并启动生产者实例 producer = TransactionMQProducer(producer_group, transaction_checker_callback) producer.set_name_server_address(name_server_addr) producer.set_session_credentials(access_key, access_secret, "") producer.start() # 组装消息 msg = Message(topic) msg.set_keys(key) msg.set_tags(tag) msg.set_body("hello volcano engine") ret = producer.send_message_in_transaction(msg, transaction_local_execute, None) print(ret.status, ret.msg_id, ret.offset) while True: time.sleep(3600)
事务消息的订阅方式与普通消息一致,示例代码如下所示。
集群模式消费消息的示例代码如下。
import time from rocketmq.client import PushConsumer, ConsumeStatus name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876" # 火山引擎控制台展示的TCP接入点 topic = "" # 在火山引擎控制台Topic管理页面创建的topic名称 group = "" # 在火山引擎控制台Group管理页面创建的group名称 access_key = "" # RocketMQ实例密钥管理页面获取到的AccessKey ID access_secret = "" # RocketMQ实例密钥管理页面获取到的AccessKey Secret tags = "" # 消费消息匹配的tag,多个tag使用||进行分隔 def callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESS consumer = PushConsumer(group) consumer.set_name_server_address(name_server_addr) consumer.set_session_credentials(access_key, access_secret, "") consumer.subscribe(topic, callback, tags) consumer.start() while True: time.sleep(3600) consumer.shutdown()
广播模式消费的示例代码如下。
import time from rocketmq.client import PushConsumer, ConsumeStatus from rocketmq.ffi import MessageModel name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876" # 火山引擎控制台展示的TCP接入点 topic = "" # 在火山引擎控制台Topic管理页面创建的topic名称 group = "" # 在火山引擎控制台Group管理页面创建的group名称 access_key = "" # RocketMQ实例密钥管理页面获取到的AccessKey ID access_secret = "" # RocketMQ实例密钥管理页面获取到的AccessKey Secret tags = "" # 消费消息匹配的tag,多个tag使用||进行分隔 def callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESS consumer = PushConsumer(group, message_model=MessageModel.BROADCASTING) # 指定消费模式为广播模式 consumer.set_name_server_address(name_server_addr) consumer.set_session_credentials(access_key, access_secret, "") consumer.subscribe(topic, callback, tags) consumer.start() while True: time.sleep(3600) consumer.shutdown()