消息队列 RocketMQ版提供顺序消息(FIFO消息)供您使用。在顺序消息模型中,您需要严格按照顺序来发布和消费消息。本文提供使用 Java SDK 收发顺序消息的示例代码供您参考。
顺序消息分为两类,全局顺序消息和分区顺序消息。区别仅为队列数量不同,代码没有区别。
MessageQueueSelector
回调函数来控制消息投递到哪个分区。您已完成准备工作。
发送顺序消息的示例代码如下。
import java.util.List; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ public class RocketMQOrderProducer { private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY")); } public static void main(String[] args) throws MQClientException { /** * 创建Producer * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.volces.com.com:9876”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook()); producer.setNamesrvAddr("YOUR ACCESS POINT"); //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true) //producer.setUseTLS(true); producer.start(); for (int i = 0; i < 128; i++) { try { int orderId = i % 10; Message msg = new Message("YOUR ORDER TOPIC", "YOUR MESSAGE TAG", "OrderID xxx", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 选择适合自己的分区选择算法,保证同一个参数得到的结果相同。 Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }
订阅顺序消息的示例代码如下。
import java.util.List; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; /** * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ public class RocketMQOrderConsumer { private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY")); } public static void main(String[] args) throws MQClientException { /** * 创建Consumer * 设置为您从火山引擎控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.volces.com.com:9876。并 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely()); consumer.setNamesrvAddr("YOUR ACCESS POINT"); consumer.subscribe("YOUR ORDER TOPIC", "*"); //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true) //consumer.setUseTLS(true); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeOrderlyStatus.SUCCESS;// 消费失败则挂起重试返回:ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }