You need to enable JavaScript to run this app.
导航
顺序消息
最近更新时间:2023.04.21 14:05:13首次发布时间:2022.01.28 15:16:40

消息队列 RocketMQ版提供顺序消息(FIFO消息)供您使用。在顺序消息模型中,您需要严格按照顺序来发布和消费消息。本文提供使用 Java SDK 收发顺序消息的示例代码供您参考。

背景信息

顺序消息分为两类,全局顺序消息和分区顺序消息。区别仅为队列数量不同,代码没有区别。

  • 全局顺序:
    对于指定的一个 Topic,所有消息的生产和消费需要遵循一定的顺序,消息的消费顺序必须和生产顺序一致,即需要严格的先入先出 FIFO(First In First Out)的顺序进行发布和消费。
  • 分区顺序:
    对于指定的一个 Topic,其中每一个分区的消息生产与消费是有序的,同一个队列内的消息按照严格的 FIFO 顺序进行发布和订阅。消息投递到哪一个分区由消息的 Sharding Key 来进行区分。在 SDK 中可以通过指定 Sharding Key 和 MessageQueueSelector 回调函数来控制消息投递到哪个分区。

前提条件

您已完成准备工作

发送顺序消息

发送顺序消息的示例代码如下。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
 */
public class RocketMQOrderProducer {
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }

    public static void main(String[] args) throws MQClientException {
        /**
         * 创建Producer
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.volces.com.com:9876”。
         * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
         */
        DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook());
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true)
        //producer.setUseTLS(true);
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                int orderId = i % 10;
                Message msg = new Message("YOUR ORDER TOPIC",
                    "YOUR MESSAGE TAG",
                    "OrderID xxx",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 选择适合自己的分区选择算法,保证同一个参数得到的结果相同。
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

订阅顺序消息

订阅顺序消息的示例代码如下。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
/**
 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
 */
public class RocketMQOrderConsumer {
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }

    public static void main(String[] args) throws MQClientException {

        /**
         * 创建Consumer
         * 设置为您从火山引擎控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.volces.com.com:9876。并
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr("YOUR ACCESS POINT");
        consumer.subscribe("YOUR ORDER TOPIC", "*");
        //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true)
        //consumer.setUseTLS(true);

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;// 消费失败则挂起重试返回:ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}