You need to enable JavaScript to run this app.
导航
普通消息
最近更新时间:2023.10.31 11:56:20首次发布时间:2023.05.12 15:10:40

火山引擎消息队列 RocketMQ版提供同步发送和单向(Oneway)发送两种方式来发送普通消息。本文介绍如何通过不同方式发送普通消息。

前提条件

发送方式

火山引擎消息队列 RocketMQ版提供的普通消息发送方式包括以下三种,您可以根据业务要求选择合适的发送方式。

/

同步发送

单向(Oneway)发送

单向发送

发送方式

消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

应用场景

重要通知邮件、报名短信通知、营销短信系统等。

适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

发送TPS

最快

最快

是否反馈发送结果

反馈

不反馈

不反馈

可靠性

不丢失消息

可能丢失消息

可能丢失消息

同步发送

同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。一般用于较为重要的消息发送场景。
同步发送方式发送普通消息的示例代码如下。

from rocketmq.client import Producer, Message

producer_group = ""  # 生产者group
name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876"  # 火山引擎控制台展示的TCP接入点
topic = ""  # 在火山引擎控制台Topic管理页面创建的topic名称
access_key = ""  # RocketMQ实例密钥管理页面获取到的AccessKey ID
access_secret = ""   # RocketMQ实例密钥管理页面获取到的AccessKey Secret
key = ""  # 消息key
tag = ""  # 消息tag

# 创建并启动生产者实例
producer = Producer(producer_group)
producer.set_name_server_address(name_server_addr)
producer.set_session_credentials(access_key, access_secret, "")
producer.start()

# 组装消息
msg = Message(topic)
msg.set_keys(key)
msg.set_tags(tag)
msg.set_body("hello volcano engine")
ret = producer.send_sync(msg) #同步发送消息

print(ret.status, ret.msg_id, ret.offset)

# 关闭生产者实例,释放资源
producer.shutdown()

单向发送

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
单向发送适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集等场景。
单向发送的示例代码如下。

from rocketmq.client import Producer, Message

producer_group = ""  # 生产者group
name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876"  # 火山引擎控制台展示的TCP接入点
topic = ""  # 在火山引擎控制台Topic管理页面创建的topic名称
access_key = ""  # RocketMQ实例密钥管理页面获取到的AccessKey ID
access_secret = ""   # RocketMQ实例密钥管理页面获取到的AccessKey Secret
key = ""  # 消息key
tag = ""  # 消息tag

# 创建并启动生产者实例
producer = Producer(producer_group)
producer.set_name_server_address(name_server_addr)
producer.set_session_credentials(access_key, access_secret, "")
producer.start()

# 组装消息
msg = Message(topic)
msg.set_keys(key)
msg.set_tags(tag)
msg.set_body("hello volcano engine")
ret = producer.send_oneway(msg) # 单向发送消息

print(ret.status, ret.msg_id, ret.offset)

# 关闭生产者实例,释放资源
producer.shutdown()

订阅普通消息

集群模式消费
集群模式消费消息的示例代码如下。

import time

from rocketmq.client import PushConsumer, ConsumeStatus

name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876"  # 火山引擎控制台展示的TCP接入点
topic = ""  # 在火山引擎控制台Topic管理页面创建的topic名称
group = ""  # 在火山引擎控制台Group管理页面创建的group名称
access_key = ""  # RocketMQ实例密钥管理页面获取到的AccessKey ID
access_secret = ""  # RocketMQ实例密钥管理页面获取到的AccessKey Secret
tags = ""  # 消费消息匹配的tag,多个tag使用||进行分隔

def callback(msg):
    print(msg.id, msg.body)
    return ConsumeStatus.CONSUME_SUCCESS

consumer = PushConsumer(group)
consumer.set_name_server_address(name_server_addr)
consumer.set_session_credentials(access_key, access_secret, "")

consumer.subscribe(topic, callback, tags)
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()

广播模式消费

广播模式消费的示例代码如下。

import time

from rocketmq.client import PushConsumer, ConsumeStatus
from rocketmq.ffi import MessageModel

name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876"  # 火山引擎控制台展示的TCP接入点
topic = ""  # 在火山引擎控制台Topic管理页面创建的topic名称
group = ""  # 在火山引擎控制台Group管理页面创建的group名称
access_key = ""  # RocketMQ实例密钥管理页面获取到的AccessKey ID
access_secret = ""  # RocketMQ实例密钥管理页面获取到的AccessKey Secret
tags = ""  # 消费消息匹配的tag,多个tag使用||进行分隔

def callback(msg):
    print(msg.id, msg.body)
    return ConsumeStatus.CONSUME_SUCCESS

consumer = PushConsumer(group, message_model=MessageModel.BROADCASTING) # 指定消费模式为广播模式
consumer.set_name_server_address(name_server_addr)
consumer.set_session_credentials(access_key, access_secret, "")

consumer.subscribe(topic, callback, tags)
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()