火山引擎消息队列 RocketMQ版提供同步发送和单向(Oneway)发送两种方式来发送普通消息。本文介绍如何通过不同方式发送普通消息。
火山引擎消息队列 RocketMQ版提供的普通消息发送方式包括以下三种,您可以根据业务要求选择合适的发送方式。
/ | 同步发送 | 单向(Oneway)发送 | 单向发送 |
---|---|---|---|
发送方式 | 消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。 | 发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。 | 发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。 |
应用场景 | 重要通知邮件、报名短信通知、营销短信系统等。 | 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。 | 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。 |
发送TPS | 快 | 最快 | 最快 |
是否反馈发送结果 | 反馈 | 不反馈 | 不反馈 |
可靠性 | 不丢失消息 | 可能丢失消息 | 可能丢失消息 |
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。一般用于较为重要的消息发送场景。
同步发送方式发送普通消息的示例代码如下。
from rocketmq.client import Producer, Message producer_group = "" # 生产者group name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876" # 火山引擎控制台展示的TCP接入点 topic = "" # 在火山引擎控制台Topic管理页面创建的topic名称 access_key = "" # RocketMQ实例密钥管理页面获取到的AccessKey ID access_secret = "" # RocketMQ实例密钥管理页面获取到的AccessKey Secret key = "" # 消息key tag = "" # 消息tag # 创建并启动生产者实例 producer = Producer(producer_group) producer.set_name_server_address(name_server_addr) producer.set_session_credentials(access_key, access_secret, "") producer.start() # 组装消息 msg = Message(topic) msg.set_keys(key) msg.set_tags(tag) msg.set_body("hello volcano engine") ret = producer.send_sync(msg) #同步发送消息 print(ret.status, ret.msg_id, ret.offset) # 关闭生产者实例,释放资源 producer.shutdown()
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
单向发送适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集等场景。
单向发送的示例代码如下。
from rocketmq.client import Producer, Message producer_group = "" # 生产者group name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876" # 火山引擎控制台展示的TCP接入点 topic = "" # 在火山引擎控制台Topic管理页面创建的topic名称 access_key = "" # RocketMQ实例密钥管理页面获取到的AccessKey ID access_secret = "" # RocketMQ实例密钥管理页面获取到的AccessKey Secret key = "" # 消息key tag = "" # 消息tag # 创建并启动生产者实例 producer = Producer(producer_group) producer.set_name_server_address(name_server_addr) producer.set_session_credentials(access_key, access_secret, "") producer.start() # 组装消息 msg = Message(topic) msg.set_keys(key) msg.set_tags(tag) msg.set_body("hello volcano engine") ret = producer.send_oneway(msg) # 单向发送消息 print(ret.status, ret.msg_id, ret.offset) # 关闭生产者实例,释放资源 producer.shutdown()
集群模式消费
集群模式消费消息的示例代码如下。
import time from rocketmq.client import PushConsumer, ConsumeStatus name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876" # 火山引擎控制台展示的TCP接入点 topic = "" # 在火山引擎控制台Topic管理页面创建的topic名称 group = "" # 在火山引擎控制台Group管理页面创建的group名称 access_key = "" # RocketMQ实例密钥管理页面获取到的AccessKey ID access_secret = "" # RocketMQ实例密钥管理页面获取到的AccessKey Secret tags = "" # 消费消息匹配的tag,多个tag使用||进行分隔 def callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESS consumer = PushConsumer(group) consumer.set_name_server_address(name_server_addr) consumer.set_session_credentials(access_key, access_secret, "") consumer.subscribe(topic, callback, tags) consumer.start() while True: time.sleep(3600) consumer.shutdown()
广播模式消费的示例代码如下。
import time from rocketmq.client import PushConsumer, ConsumeStatus from rocketmq.ffi import MessageModel name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876" # 火山引擎控制台展示的TCP接入点 topic = "" # 在火山引擎控制台Topic管理页面创建的topic名称 group = "" # 在火山引擎控制台Group管理页面创建的group名称 access_key = "" # RocketMQ实例密钥管理页面获取到的AccessKey ID access_secret = "" # RocketMQ实例密钥管理页面获取到的AccessKey Secret tags = "" # 消费消息匹配的tag,多个tag使用||进行分隔 def callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESS consumer = PushConsumer(group, message_model=MessageModel.BROADCASTING) # 指定消费模式为广播模式 consumer.set_name_server_address(name_server_addr) consumer.set_session_credentials(access_key, access_secret, "") consumer.subscribe(topic, callback, tags) consumer.start() while True: time.sleep(3600) consumer.shutdown()