消息队列 RocketMQ版提供 TCP 协议下的 RocketMQ 开源 Python SDK 的相关说明,本文档介绍收发定时消息和延时消息的示例代码。
如果发送消息到消息队列 RocketMQ版服务端后,不希望立即投递消息,可以使用定时或延时消息,根据消息中指定的属性延迟一定时间投递或指定时间点投递至消费端。其中,推迟到后续的某个指定时间再投递到消费端进行消费的消息为定时消息。推迟一定时间再投递到消费端进行消费的消息为延时消息,例如指定在消息发送时间的 30 分钟之后进行投递。
火山引擎消息队列 RocketMQ版提供了两种发送延时消息的方式,一种是特定延时时间,另一种是任意延时时间。您可以通过消息属性中的定时时间实现消息的定时发送,其中任意精度的延时消息包括以下两种。
特定精度延时消息,只支持特定的 18 个等级。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
任意精度延时消息:对于 2023年2月21日之后创建的 RocketMQ 实例,建议通过属性 __STARTDELIVERTIME
来使用任意精度的延时消息。
说明
开源 Apache RocketMQ SDK 支持延时消息,但不支持定时消息,因此没有专门的定时消息接口。
发送定时消息或延时消息之前,请确认已完成以下操作。
类别 | 注意事项 |
---|---|
延时时长 | 延时消息支持自定义毫秒级延时,延时时长最长为 3 天或消息保留时长的 3 倍(两者取较小值)。 |
定时时间 |
|
时间精度 |
|
Exactly Only Once | 消息队列 RocketMQ版无法确保定时消息仅投递一次,某些定时消息可能会被重复投递。 |
存储空间 | 定时消息的存储资源消耗较大,同样的消息内容,定时消息的存储空间占用量约为普通消息的 3 倍,频繁使用定时消息时需要注意存储空间的占用情况。 |
RocketMQ 开源 Python SDK 发送定时消息或延时消息的示例代码如下。
import datetime from rocketmq.client import Producer, Message, SendResult from rocketmq.exceptions import RocketMQException producer_group = "" # 生产者group name_server_addr = "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876" # 火山引擎控制台展示的TCP接入点 topic = "test" # 在火山引擎控制台Topic管理页面创建的topic名称 access_key = "ACCESS_KEY" # RocketMQ实例密钥管理页面获取到的AccessKey ID access_secret = "SECRET_KEY" # RocketMQ实例密钥管理页面获取到的AccessKey Secret key = "test" # 消息key tag = "test-tag" # 消息tag # 初始化生产者 producer = Producer(producer_group) producer.set_namesrv_addr(name_server_addr) producer.set_session_credentials(access_key, access_secret, "") # 启动生产者 try: producer.start() except RocketMQException as e: print('start producer error:', e) exit(1) # 创建消息 msg = Message(topic) msg.set_keys(key) msg.set_tags(tag) msg.set_body("hello volcano engine python") delay_time = 30 # 延时时间 delay_timestamp = int((datetime.datetime.now() + datetime.timedelta(seconds=delay_time)).timestamp() * 1000) /** *若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2021-08-10 18:45:00投递。 *定时时间格式为:yyyy-MM-dd HH:mm:ss,若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。 * delay_timestamp = int(datetime(2021, 8, 10, 18, 45, 0).timestamp() * 1000) * msg.set_property("__STARTDELIVERTIME", str(delay_timestamp)) */ msg.set_property('__STARTDELIVERTIME', str(delay_timestamp)) # 发送消息 try: result = producer.send_sync(msg) except RocketMQException as e: print('send message error:', e) producer.shutdown() exit(1) print('send result:', result) # 关闭生产者 producer.shutdown()