火山引擎消息队列 RocketMQ版提供同步发送、异步发送和单向(Oneway)发送三种方式来发送普通消息。本文介绍如何通过不同方式发送普通消息。
您已完成准备工作。
火山引擎消息队列 RocketMQ版提供的普通消息发送方式包括以下三种,您可以根据业务要求选择合适的发送方式。
同步发送 | 异步发送 | 单向发送 | |
---|---|---|---|
发送方式 | 消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。 | 消息发送方发出一条消息后,不等服务端返回响应,直接发送下一条消息。 | 发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。 |
应用场景 | 重要通知邮件、报名短信通知、营销短信系统等。 | 一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。 | 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。 |
TPS | 快 | 快 | 最快 |
是否反馈发送结果 | 反馈 | 反馈 | 不反馈 |
可靠性 | 不丢失消息 | 不丢失消息 | 可能丢失消息 |
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。一般用于较为重要的消息发送场景。
同步发送方式发送普通消息的示例代码如下。
import java.util.Date; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQProducer { //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的 GID,以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret。 private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY")); } public static void main(String[] args) throws MQClientException { /** * 创建 Producer * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:9876”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook()); producer.setNamesrvAddr("YOUR ACCESS POINT"); //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true) //producer.setUseTLS(true); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { //消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } //在应用退出前,销毁Producer对象。 producer.shutdown(); } }
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
火山引擎消息队列 RocketMQ版的异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
异步发送的示例代码如下。
import java.util.Date; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQAsyncProducer { //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret。 private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY")); } public static void main(String[] args) throws MQClientException { /** * 创建Producer * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:9876”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook()); producer.setNamesrvAddr("YOUR ACCESS POINT"); //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true) //producer.setUseTLS(true); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { // 消费发送成功。 System.out.println("send message success. msgId= " + result.getMsgId()); } @Override public void onException(Throwable throwable) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println("send message failed."); throwable.printStackTrace(); } }); } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // 在应用退出前,销毁Producer对象。 // 注意:如果不销毁也没有问题。 producer.shutdown(); } }
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
单向发送适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集等场景。
单向发送的示例代码如下。
import java.util.Date; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQOnewayProducer { //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret。 private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY")); } public static void main(String[] args) throws MQClientException { /** * 创建Producer * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.volces.com.com:9876”。 * 设置RocketMQ密钥,即RocketMQ实例的AccessKey ID和AccessKey Secret。 */ DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook()); producer.setNamesrvAddr("YOUR ACCESS POINT"); //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true) //producer.setUseTLS(true); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // 在应用退出前,销毁Producer对象。 // 注意:如果不销毁也没有问题。 producer.shutdown(); } }
您可以参考以下示例代码订阅普通消息。
import java.util.List; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; public class RocketMQPushConsumer { //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY")); } public static void main(String[] args) throws MQClientException { //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID, 以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret。 //设置为火山引擎消息队列 RocketMQ版实例的接入点。 // 设置为您在火山引擎消息队列 RocketMQ版控制台上创建的Topic。 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely()); consumer.setNamesrvAddr("http://{INSTANCE_ID}.rocketmq.volces.com.com:9876"); consumer.subscribe("YOUR TOPIC", "*"); //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true) //consumer.setUseTLS(true); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("Receive New Messages: %s %n", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }